refactor(logger): format logging (#12)

* feat(logger): format main logger

* feat(logger): use duration ms for gcp output

* refactor(logger): bitcoin node logger

* refactor(logger): indexer logger

* refactor(logger): fix cmd logger

* refactor(logger): logger in config pacakge

* refactor(logger): set pgx error log level debug

* refactor(logger): btcclient datasource

* refactor: processor name

* refactor(logger): runese logger

* refactor(logger): update logger

* fix(runes): wrong btc db datasource

* refactor(logger): remove unnecessary debug log

* refactor: update logger in indexer

* fix(logger): deadlock in load()

* fix: remove unused

---------

Co-authored-by: Gaze <gazenw@users.noreply.github.com>
This commit is contained in:
gazenw
2024-04-26 23:23:37 +07:00
committed by GitHub
parent 69ab16c35a
commit fe6988627a
17 changed files with 245 additions and 156 deletions

View File

@@ -44,7 +44,7 @@ func Execute(ctx context.Context) {
// Initialize logger
if err := logger.Init(config.Logger); err != nil {
logger.Panic("Failed to initialize logger: %v", slogx.Error(err), slog.Any("config", config.Logger))
logger.PanicContext(ctx, "Something went wrong, can't init logger", slogx.Error(err), slog.Any("config", config.Logger))
}
})
@@ -53,7 +53,7 @@ func Execute(ctx context.Context) {
// Execute command
if err := cmd.ExecuteContext(ctx); err != nil {
// use cobra to log error message by default
logger.Debug("Failed to execute root command", slogx.Error(err))
// Cobra will print the error message by default
logger.DebugContext(ctx, "Error executing command", slogx.Error(err))
}
}

View File

@@ -87,6 +87,13 @@ type HttpHandler interface {
func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
conf := config.Load()
// Validate inputs
{
if !conf.Network.IsSupported() {
return errors.Wrapf(errs.Unsupported, "%q network is not supported", conf.Network.String())
}
}
// Initialize application process context
ctx, stop := signal.NotifyContext(cmd.Context(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
defer stop()
@@ -96,7 +103,6 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
defer stopWorker()
// Add logger context
ctx = logger.WithContext(ctx, slogx.Stringer("network", conf.Network))
ctxWorker = logger.WithContext(ctxWorker, slogx.Stringer("network", conf.Network))
// Initialize Bitcoin Core RPC Client
@@ -108,19 +114,18 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
HTTPPostMode: true,
}, nil)
if err != nil {
logger.PanicContext(ctx, "Failed to create Bitcoin Core RPC Client", slogx.Error(err))
logger.PanicContext(ctx, "Invalid Bitcoin node configuration", slogx.Error(err))
}
defer client.Shutdown()
logger.InfoContext(ctx, "Connecting to Bitcoin Core RPC Server...", slogx.String("host", conf.BitcoinNode.Host))
if err := client.Ping(); err != nil {
logger.PanicContext(ctx, "Failed to ping Bitcoin Core RPC Server", slogx.Error(err))
}
logger.InfoContext(ctx, "Connected to Bitcoin Core RPC Server")
// Validate network
if !conf.Network.IsSupported() {
return errors.Wrapf(errs.Unsupported, "%q network is not supported", conf.Network.String())
// Check Bitcoin RPC connection
{
start := time.Now()
logger.InfoContext(ctx, "Connecting to Bitcoin Core RPC Server...", slogx.String("host", conf.BitcoinNode.Host))
if err := client.Ping(); err != nil {
logger.PanicContext(ctx, "Can't connect to Bitcoin Core RPC Server", slogx.String("host", conf.BitcoinNode.Host), slogx.Error(err))
}
logger.InfoContext(ctx, "Connected to Bitcoin Core RPC Server", slog.Duration("latency", time.Since(start)))
}
// TODO: create module command package.
@@ -133,12 +138,16 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
if !conf.Reporting.Disabled {
reportingClient, err = reportingclient.New(conf.Reporting)
if err != nil {
logger.PanicContext(ctx, "Failed to create reporting client", slogx.Error(err))
if errors.Is(err, errs.InvalidArgument) {
logger.PanicContext(ctx, "Invalid reporting configuration", slogx.Error(err))
}
logger.PanicContext(ctx, "Something went wrong, can't create reporting client", slogx.Error(err))
}
}
// Initialize Bitcoin Indexer
if opts.Bitcoin {
ctx := logger.WithContext(ctx, slogx.String("module", "bitcoin"))
var (
btcDB btcdatagateway.BitcoinDataGateway
indexerInfoDB btcdatagateway.IndexerInformationDataGateway
@@ -147,14 +156,17 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
case "postgresql", "postgres", "pg":
pg, err := postgres.NewPool(ctx, conf.Modules.Bitcoin.Postgres)
if err != nil {
logger.PanicContext(ctx, "Failed to create Postgres connection pool", slogx.Error(err))
if errors.Is(err, errs.InvalidArgument) {
logger.PanicContext(ctx, "Invalid Postgres configuration for indexer", slogx.Error(err))
}
logger.PanicContext(ctx, "Something went wrong, can't create Postgres connection pool", slogx.Error(err))
}
defer pg.Close()
repo := btcpostgres.NewRepository(pg)
btcDB = repo
indexerInfoDB = repo
default:
return errors.Wrapf(errs.Unsupported, "%q database is not supported", conf.Modules.Bitcoin.Database)
return errors.Wrapf(errs.Unsupported, "%q database for indexer is not supported", conf.Modules.Bitcoin.Database)
}
if !opts.APIOnly {
processor := bitcoin.NewProcessor(conf, btcDB, indexerInfoDB)
@@ -162,9 +174,10 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
indexer := indexers.NewBitcoinIndexer(processor, datasource)
defer func() {
if err := indexer.ShutdownWithTimeout(shutdownTimeout); err != nil {
logger.ErrorContext(ctx, "Error during shutdown Bitcoin indexer", slogx.Error(err))
logger.ErrorContext(ctx, "Error during shutdown indexer", slogx.Error(err))
return
}
logger.InfoContext(ctx, "Bitcoin indexer stopped gracefully")
logger.InfoContext(ctx, "Indexer stopped gracefully")
}()
// Verify states before running Indexer
@@ -177,9 +190,9 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
// stop main process if indexer stopped
defer stop()
logger.InfoContext(ctx, "Starting Bitcoin Indexer")
logger.InfoContext(ctx, "Starting Gaze Indexer")
if err := indexer.Run(ctxWorker); err != nil {
logger.PanicContext(ctx, "Failed to run Bitcoin Indexer", slogx.Error(err))
logger.PanicContext(ctx, "Something went wrong, error during running indexer", slogx.Error(err))
}
}()
}
@@ -187,20 +200,26 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
// Initialize Runes Indexer
if opts.Runes {
var runesDg runesdatagateway.RunesDataGateway
var indexerInfoDg runesdatagateway.IndexerInfoDataGateway
ctx := logger.WithContext(ctx, slogx.String("module", "runes"))
var (
runesDg runesdatagateway.RunesDataGateway
indexerInfoDg runesdatagateway.IndexerInfoDataGateway
)
switch strings.ToLower(conf.Modules.Runes.Database) {
case "postgresql", "postgres", "pg":
pg, err := postgres.NewPool(ctx, conf.Modules.Runes.Postgres)
if err != nil {
logger.PanicContext(ctx, "Failed to create Postgres connection pool", slogx.Error(err))
if errors.Is(err, errs.InvalidArgument) {
logger.PanicContext(ctx, "Invalid Postgres configuration for indexer", slogx.Error(err))
}
logger.PanicContext(ctx, "Something went wrong, can't create Postgres connection pool", slogx.Error(err))
}
defer pg.Close()
runesRepo := runespostgres.NewRepository(pg)
runesDg = runesRepo
indexerInfoDg = runesRepo
default:
logger.PanicContext(ctx, "Unsupported database", slogx.String("database", conf.Modules.Runes.Database))
return errors.Wrapf(errs.Unsupported, "%q database for indexer is not supported", conf.Modules.Runes.Database)
}
var bitcoinDatasource indexers.BitcoinDatasource
var bitcoinClient btcclient.Contract
@@ -212,7 +231,10 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
case "database":
pg, err := postgres.NewPool(ctx, conf.Modules.Bitcoin.Postgres)
if err != nil {
logger.PanicContext(ctx, "Failed to create Postgres connection pool", slogx.Error(err))
if errors.Is(err, errs.InvalidArgument) {
logger.PanicContext(ctx, "Invalid Postgres configuration for datasource", slogx.Error(err))
}
logger.PanicContext(ctx, "Something went wrong, can't create Postgres connection pool", slogx.Error(err))
}
defer pg.Close()
btcRepo := btcpostgres.NewRepository(pg)
@@ -228,9 +250,10 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
indexer := indexers.NewBitcoinIndexer(processor, bitcoinDatasource)
defer func() {
if err := indexer.ShutdownWithTimeout(shutdownTimeout); err != nil {
logger.ErrorContext(ctx, "Error during shutdown Runes indexer", slogx.Error(err))
logger.ErrorContext(ctx, "Error during shutdown indexer", slogx.Error(err))
return
}
logger.InfoContext(ctx, "Runes indexer stopped gracefully")
logger.InfoContext(ctx, "Indexer stopped gracefully")
}()
if err := processor.VerifyStates(ctx); err != nil {
@@ -242,9 +265,9 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
// stop main process if indexer stopped
defer stop()
logger.InfoContext(ctx, "Started Runes Indexer")
logger.InfoContext(ctx, "Starting Gaze Indexer")
if err := indexer.Run(ctxWorker); err != nil {
logger.PanicContext(ctx, "Failed to run Runes Indexer", slogx.Error(err))
logger.PanicContext(ctx, "Something went wrong, error during running indexer", slogx.Error(err))
}
}()
}
@@ -258,7 +281,7 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
runesHTTPHandler := runesapi.NewHTTPHandler(conf.Network, runesUsecase)
httpHandlers["runes"] = runesHTTPHandler
default:
logger.PanicContext(ctx, "Unsupported API handler", slogx.String("handler", handler))
logger.PanicContext(ctx, "Something went wrong, unsupported API handler", slogx.String("handler", handler))
}
}
}
@@ -276,7 +299,7 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
StackTraceHandler: func(c *fiber.Ctx, e interface{}) {
buf := make([]byte, 1024) // bufLen = 1024
buf = buf[:runtime.Stack(buf, false)]
logger.ErrorContext(c.UserContext(), "panic in http handler", slogx.Any("panic", e), slog.String("stacktrace", string(buf)))
logger.ErrorContext(c.UserContext(), "Something went wrong, panic in http handler", slogx.Any("panic", e), slog.String("stacktrace", string(buf)))
},
})).
Use(compress.New(compress.Config{
@@ -286,6 +309,7 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
defer func() {
if err := app.ShutdownWithTimeout(shutdownTimeout); err != nil {
logger.ErrorContext(ctx, "Error during shutdown HTTP server", slogx.Error(err))
return
}
logger.InfoContext(ctx, "HTTP server stopped gracefully")
}()
@@ -298,7 +322,7 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
// mount http handlers from each http-enabled module
for module, handler := range httpHandlers {
if err := handler.Mount(app); err != nil {
logger.PanicContext(ctx, "Failed to mount HTTP handler", slogx.Error(err), slogx.String("module", module))
logger.PanicContext(ctx, "Something went wrong, can't mount HTTP handler", slogx.Error(err), slogx.String("module", module))
}
logger.InfoContext(ctx, "Mounted HTTP handler", slogx.String("module", module))
}
@@ -309,7 +333,7 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
logger.InfoContext(ctx, "Started HTTP server", slog.Int("port", conf.HTTPServer.Port))
if err := app.Listen(fmt.Sprintf(":%d", conf.HTTPServer.Port)); err != nil {
logger.PanicContext(ctx, "Failed to start HTTP server", slogx.Error(err))
logger.PanicContext(ctx, "Something went wrong, error during running HTTP server", slogx.Error(err))
}
}()
}
@@ -319,9 +343,11 @@ func runHandler(opts *runCmdOptions, cmd *cobra.Command, _ []string) error {
<-ctxWorker.Done()
defer stop()
logger.InfoContext(ctx, "Worker is stopped. Stopping main process...")
logger.InfoContext(ctx, "Gaze Indexer Worker is stopped. Stopping application...")
}()
logger.InfoContext(ctxWorker, "Gaze Indexer started")
// Wait for interrupt signal to gracefully stop the server
<-ctx.Done()

View File

@@ -39,7 +39,7 @@ func NewBitcoinNode(btcclient *rpcclient.Client) *BitcoinNodeDatasource {
}
func (p BitcoinNodeDatasource) Name() string {
return "BitcoinNode"
return "bitcoin_node"
}
// Fetch polling blocks from Bitcoin node
@@ -83,6 +83,11 @@ func (d *BitcoinNodeDatasource) Fetch(ctx context.Context, from, to int64) ([]*t
// - from: block height to start fetching, if -1, it will start from genesis block
// - to: block height to stop fetching, if -1, it will fetch until the latest block
func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
ctx = logger.WithContext(ctx,
slogx.String("package", "datasources"),
slogx.String("datasource", d.Name()),
)
from, to, skip, err := d.prepareRange(from, to)
if err != nil {
return nil, errors.Wrap(err, "failed to prepare fetch range")
@@ -138,10 +143,10 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
if errors.Is(err, errs.Closed) {
return
}
logger.WarnContext(ctx, "failed while dispatch block",
slogx.Error(err),
logger.WarnContext(ctx, "Failed to send bitcoin blocks to subscription client",
slogx.Int64("start", data[0].Header.Height),
slogx.Int64("end", data[len(data)-1].Header.Height),
slogx.Error(err),
)
}
case <-ctx.Done():
@@ -168,12 +173,11 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
stream.Go(func() []*types.Block {
startAt := time.Now()
defer func() {
logger.DebugContext(ctx, "[BitcoinNodeDatasource] Fetched blocks",
logger.DebugContext(ctx, "Fetched chunk of blocks from Bitcoin node",
slogx.Int("total_blocks", len(chunk)),
slogx.Int64("from", chunk[0]),
slogx.Int64("to", chunk[len(chunk)-1]),
slogx.Stringer("duration", time.Since(startAt)),
slogx.Int64("duration_ms", time.Since(startAt).Milliseconds()),
slogx.Duration("duration", time.Since(startAt)),
)
}()
// TODO: should concurrent fetch block or not ?
@@ -181,22 +185,21 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
for _, height := range chunk {
hash, err := d.btcclient.GetBlockHash(height)
if err != nil {
logger.ErrorContext(ctx, "failed to get block hash", slogx.Error(err), slogx.Int64("height", height))
logger.ErrorContext(ctx, "Can't get block hash from Bitcoin node rpc", slogx.Error(err), slogx.Int64("height", height))
if err := subscription.SendError(ctx, errors.Wrapf(err, "failed to get block hash: height: %d", height)); err != nil {
logger.ErrorContext(ctx, "failed to send error", slogx.Error(err))
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return nil
}
block, err := d.btcclient.GetBlock(hash)
if err != nil {
logger.ErrorContext(ctx, "failed to get block", slogx.Error(err), slogx.Int64("height", height))
logger.ErrorContext(ctx, "Can't get block data from Bitcoin node rpc", slogx.Error(err), slogx.Int64("height", height))
if err := subscription.SendError(ctx, errors.Wrapf(err, "failed to get block: height: %d, hash: %s", height, hash)); err != nil {
logger.ErrorContext(ctx, "failed to send error", slogx.Error(err))
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return nil
}
logger.DebugContext(ctx, "[BitcoinNodeDatasource] Fetched block", slogx.Int64("height", height), slogx.String("hash", hash.String()))
blocks = append(blocks, types.ParseMsgBlock(block, height))
}

View File

@@ -48,6 +48,10 @@ func NewBitcoinIndexer(processor BitcoinProcessor, datasource BitcoinDatasource)
}
}
func (*BitcoinIndexer) Type() string {
return "bitcoin"
}
func (i *BitcoinIndexer) Shutdown() error {
return i.ShutdownWithContext(context.Background())
}
@@ -76,7 +80,8 @@ func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
defer close(i.done)
ctx = logger.WithContext(ctx,
slog.String("indexer", "bitcoin"),
slog.String("package", "indexers"),
slog.String("indexer", i.Type()),
slog.String("processor", i.Processor.Name()),
slog.String("datasource", i.Datasource.Name()),
)
@@ -90,7 +95,7 @@ func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
i.currentBlock.Height = -1
}
ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
for {
select {
@@ -101,21 +106,23 @@ func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
return nil
case <-ticker.C:
if err := i.process(ctx); err != nil {
logger.ErrorContext(ctx, "Failed while process", slogx.Error(err))
return errors.Wrap(err, "Failed while process")
logger.ErrorContext(ctx, "Indexer failed while processing", slogx.Error(err))
return errors.Wrap(err, "process failed")
}
logger.InfoContext(ctx, "Waiting for next round")
logger.DebugContext(ctx, "Waiting for next polling interval")
}
}
}
func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
ch := make(chan []*types.Block)
// height range to fetch data
from, to := i.currentBlock.Height+1, int64(-1)
logger.InfoContext(ctx, "Fetching blocks", slog.Int64("from", from), slog.Int64("to", to))
logger.InfoContext(ctx, "Start fetching bitcoin blocks", slog.Int64("from", from))
ch := make(chan []*types.Block)
subscription, err := i.Datasource.FetchAsync(ctx, from, to, ch)
if err != nil {
return errors.Wrap(err, "failed to call fetch async")
return errors.Wrap(err, "failed to fetch data")
}
defer subscription.Unsubscribe()
@@ -131,7 +138,6 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
startAt := time.Now()
ctx := logger.WithContext(ctx,
slog.Int("total_blocks", len(blocks)),
slogx.Int64("from", blocks[0].Header.Height),
slogx.Int64("to", blocks[len(blocks)-1].Header.Height),
)
@@ -140,7 +146,8 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
{
remoteBlockHeader := blocks[0].Header
if !remoteBlockHeader.PrevBlock.IsEqual(&i.currentBlock.Hash) {
logger.WarnContext(ctx, "Reorg detected",
logger.WarnContext(ctx, "Detected chain reorganization. Searching for fork point...",
slogx.String("event", "reorg_detected"),
slogx.Stringer("current_hash", i.currentBlock.Hash),
slogx.Stringer("expected_hash", remoteBlockHeader.PrevBlock),
)
@@ -179,12 +186,15 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
return errors.Wrap(errs.SomethingWentWrong, "reorg look back limit reached")
}
// Revert all data since the reorg block
logger.WarnContext(ctx, "reverting reorg data",
slogx.Int64("reorg_from", beforeReorgBlockHeader.Height+1),
slogx.Int64("total_reorg_blocks", i.currentBlock.Height-beforeReorgBlockHeader.Height),
slogx.Stringer("detect_duration", time.Since(start)),
logger.InfoContext(ctx, "Found reorg fork point, starting to revert data...",
slogx.String("event", "reorg_forkpoint"),
slogx.Int64("since", beforeReorgBlockHeader.Height+1),
slogx.Int64("total_blocks", i.currentBlock.Height-beforeReorgBlockHeader.Height),
slogx.Duration("search_duration", time.Since(start)),
)
// Revert all data since the reorg block
start = time.Now()
if err := i.Processor.RevertData(ctx, beforeReorgBlockHeader.Height+1); err != nil {
return errors.Wrap(err, "failed to revert data")
}
@@ -192,10 +202,9 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
// Set current block to before reorg block and
// end current round to fetch again
i.currentBlock = beforeReorgBlockHeader
logger.Info("Reverted data successfully",
slogx.Any("current_block", i.currentBlock),
slogx.Stringer("duration", time.Since(start)),
slogx.Int64("duration_ms", time.Since(start).Milliseconds()),
logger.Info("Fixing chain reorganization completed",
slogx.Int64("current_block", i.currentBlock.Height),
slogx.Duration("duration", time.Since(start)),
)
return nil
}
@@ -208,12 +217,15 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
}
if !blocks[i].Header.PrevBlock.IsEqual(&blocks[i-1].Header.Hash) {
logger.WarnContext(ctx, "reorg occurred while batch fetching blocks, need to try to fetch again")
logger.WarnContext(ctx, "Chain Reorganization occurred in the middle of batch fetching blocks, need to try to fetch again")
// end current round
return nil
}
}
ctx = logger.WithContext(ctx, slog.Int("total_blocks", len(blocks)))
// Start processing blocks
logger.InfoContext(ctx, "Processing blocks")
if err := i.Processor.Process(ctx, blocks); err != nil {
@@ -224,8 +236,9 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
i.currentBlock = blocks[len(blocks)-1].Header
logger.InfoContext(ctx, "Processed blocks successfully",
slogx.Stringer("duration", time.Since(startAt)),
slogx.Int64("duration_ms", time.Since(startAt).Milliseconds()),
slogx.String("event", "processed_blocks"),
slogx.Int64("current_block", i.currentBlock.Height),
slogx.Duration("duration", time.Since(startAt)),
)
case <-subscription.Done():
// end current round

View File

@@ -7,7 +7,13 @@ import (
"github.com/gaze-network/indexer-network/core/types"
)
const (
// pollingInterval is the default polling interval for the indexer polling worker
pollingInterval = 15 * time.Second
)
type IndexerWorker interface {
Type() string
Run(ctx context.Context) error
Shutdown() error
ShutdownWithTimeout(timeout time.Duration) error

View File

@@ -18,8 +18,9 @@ import (
)
var (
configOnce sync.Once
config = &Config{
isInit bool
mu sync.Mutex
config = &Config{
Logger: logger.Config{
Output: "TEXT",
},
@@ -58,6 +59,38 @@ type HTTPServerConfig struct {
// Parse parse the configuration from environment variables
func Parse(configFile ...string) Config {
mu.Lock()
defer mu.Unlock()
return parse(configFile...)
}
// Load returns the loaded configuration
func Load() Config {
mu.Lock()
defer mu.Unlock()
if isInit {
return *config
}
return parse()
}
// BindPFlag binds a specific key to a pflag (as used by cobra).
// Example (where serverCmd is a Cobra instance):
//
// serverCmd.Flags().Int("port", 1138, "Port to run Application server on")
// Viper.BindPFlag("port", serverCmd.Flags().Lookup("port"))
func BindPFlag(key string, flag *pflag.Flag) {
if err := viper.BindPFlag(key, flag); err != nil {
logger.Panic("Something went wrong, failed to bind flag for config", slog.String("package", "config"), slogx.Error(err))
}
}
// SetDefault sets the default value for this key.
// SetDefault is case-insensitive for a key.
// Default only used when no value is provided by the user via flag, config or ENV.
func SetDefault(key string, value any) { viper.SetDefault(key, value) }
func parse(configFile ...string) Config {
ctx := logger.WithContext(context.Background(), slog.String("package", "config"))
if len(configFile) > 0 && configFile[0] != "" {
@@ -72,39 +105,16 @@ func Parse(configFile ...string) Config {
if err := viper.ReadInConfig(); err != nil {
var errNotfound viper.ConfigFileNotFoundError
if errors.As(err, &errNotfound) {
logger.WarnContext(ctx, "config file not found, use default value", slogx.Error(err))
logger.WarnContext(ctx, "Config file not found, use default config value", slogx.Error(err))
} else {
logger.PanicContext(ctx, "invalid config file", slogx.Error(err))
logger.PanicContext(ctx, "Invalid config file", slogx.Error(err))
}
}
if err := viper.Unmarshal(&config); err != nil {
logger.PanicContext(ctx, "failed to unmarshal config", slogx.Error(err))
logger.PanicContext(ctx, "Something went wrong, failed to unmarshal config", slogx.Error(err))
}
isInit = true
return *config
}
// Load returns the loaded configuration
func Load() Config {
configOnce.Do(func() {
_ = Parse()
})
return *config
}
// BindPFlag binds a specific key to a pflag (as used by cobra).
// Example (where serverCmd is a Cobra instance):
//
// serverCmd.Flags().Int("port", 1138, "Port to run Application server on")
// Viper.BindPFlag("port", serverCmd.Flags().Lookup("port"))
func BindPFlag(key string, flag *pflag.Flag) {
if err := viper.BindPFlag(key, flag); err != nil {
logger.Panic("Failed to bind pflag for config", slogx.Error(err))
}
}
// SetDefault sets the default value for this key.
// SetDefault is case-insensitive for a key.
// Default only used when no value is provided by the user via flag, config or ENV.
func SetDefault(key string, value any) { viper.SetDefault(key, value) }

View File

@@ -6,6 +6,7 @@ import (
"github.com/Cleverse/go-utilities/utils"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
@@ -16,7 +17,7 @@ import (
const (
DefaultMaxConns = 16
DefaultMinConns = 0
DefaultLogLevel = tracelog.LogLevelError
DefaultLogLevel = tracelog.LogLevelDebug
)
type Config struct {
@@ -62,7 +63,7 @@ func NewPool(ctx context.Context, conf Config) (*pgxpool.Pool, error) {
// Prepare connection pool configuration
connConfig, err := pgxpool.ParseConfig(conf.String())
if err != nil {
return nil, errors.Wrap(err, "failed to parse config to create a new connection pool")
return nil, errors.Join(errs.InvalidArgument, errors.Wrap(err, "failed while parse config"))
}
connConfig.MaxConns = utils.Default(conf.MaxConns, DefaultMaxConns)
connConfig.MinConns = utils.Default(conf.MinConns, DefaultMinConns)

View File

@@ -37,13 +37,13 @@ func NewClientDatabase(bitcoinDg datagateway.BitcoinDataGateway) *ClientDatabase
}
}
func (c ClientDatabase) Name() string {
return "BitcoinDatabase"
func (d ClientDatabase) Name() string {
return "bitcoin_database"
}
func (c *ClientDatabase) Fetch(ctx context.Context, from, to int64) ([]*types.Block, error) {
func (d *ClientDatabase) Fetch(ctx context.Context, from, to int64) ([]*types.Block, error) {
ch := make(chan []*types.Block)
subscription, err := c.FetchAsync(ctx, from, to, ch)
subscription, err := d.FetchAsync(ctx, from, to, ch)
if err != nil {
return nil, errors.WithStack(err)
}
@@ -73,8 +73,13 @@ func (c *ClientDatabase) Fetch(ctx context.Context, from, to int64) ([]*types.Bl
}
}
func (c *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
from, to, skip, err := c.prepareRange(ctx, from, to)
func (d *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*subscription.ClientSubscription[[]*types.Block], error) {
ctx = logger.WithContext(ctx,
slogx.String("package", "datasources"),
slogx.String("datasource", d.Name()),
)
from, to, skip, err := d.prepareRange(ctx, from, to)
if err != nil {
return nil, errors.Wrap(err, "failed to prepare fetch range")
}
@@ -129,10 +134,10 @@ func (c *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan
if errors.Is(err, errs.Closed) {
return
}
logger.WarnContext(ctx, "failed while dispatch block",
slogx.Error(err),
logger.WarnContext(ctx, "Failed to send bitcoin blocks to subscription client",
slogx.Int64("start", data[0].Header.Height),
slogx.Int64("end", data[len(data)-1].Header.Height),
slogx.Error(err),
)
}
case <-ctx.Done():
@@ -159,16 +164,26 @@ func (c *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan
continue
}
stream.Go(func() []*types.Block {
startAt := time.Now()
defer func() {
logger.DebugContext(ctx, "Fetched chunk of blocks from Bitcoin node",
slogx.Int("total_blocks", len(chunk)),
slogx.Int64("from", chunk[0]),
slogx.Int64("to", chunk[len(chunk)-1]),
slogx.Duration("duration", time.Since(startAt)),
)
}()
fromHeight, toHeight := chunk[0], chunk[len(chunk)-1]
blocks, err := c.bitcoinDg.GetBlocksByHeightRange(ctx, fromHeight, toHeight)
blocks, err := d.bitcoinDg.GetBlocksByHeightRange(ctx, fromHeight, toHeight)
if err != nil {
logger.ErrorContext(ctx, "failed to get blocks",
logger.ErrorContext(ctx, "Can't get block data from Bitcoin database",
slogx.Error(err),
slogx.Int64("from_height", fromHeight),
slogx.Int64("to_height", toHeight),
slogx.Int64("from", fromHeight),
slogx.Int64("to", toHeight),
)
if err := subscription.SendError(ctx, errors.Wrapf(err, "failed to get blocks: from_height: %d, to_height: %d", fromHeight, toHeight)); err != nil {
logger.ErrorContext(ctx, "failed to send error", slogx.Error(err))
logger.WarnContext(ctx, "Failed to send datasource error to subscription client", slogx.Error(err))
}
return nil
}

View File

@@ -29,7 +29,7 @@ func NewProcessor(config config.Config, bitcoinDg datagateway.BitcoinDataGateway
}
func (p Processor) Name() string {
return "Bitcoin"
return "bitcoin"
}
func (p *Processor) Process(ctx context.Context, inputs []*types.Block) error {

View File

@@ -16,6 +16,7 @@ import (
"github.com/gaze-network/indexer-network/modules/runes/internal/entity"
"github.com/gaze-network/indexer-network/modules/runes/runes"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/gaze-network/indexer-network/pkg/reportingclient"
"github.com/gaze-network/uint128"
"github.com/samber/lo"
@@ -158,7 +159,7 @@ func (p *Processor) ensureGenesisRune(ctx context.Context) error {
}
func (p *Processor) Name() string {
return "Runes"
return "runes"
}
func (p *Processor) CurrentBlock(ctx context.Context) (types.BlockHeader, error) {
@@ -192,7 +193,10 @@ func (p *Processor) RevertData(ctx context.Context, from int64) error {
}
defer func() {
if err := runesDgTx.Rollback(ctx); err != nil {
logger.ErrorContext(ctx, "failed to rollback transaction", err)
logger.WarnContext(ctx, "failed to rollback transaction",
slogx.Error(err),
slogx.String("event", "rollback_runes_revert"),
)
}
}()

View File

@@ -25,17 +25,20 @@ import (
func (p *Processor) Process(ctx context.Context, blocks []*types.Block) error {
for _, block := range blocks {
ctx := logger.WithContext(ctx, slog.Int("block_height", int(block.Header.Height)))
logger.DebugContext(ctx, "[RunesProcessor] Processing block", slog.Int("txs", len(block.Transactions)))
ctx := logger.WithContext(ctx, slog.Int64("height", block.Header.Height))
logger.DebugContext(ctx, "Processing new block", slog.Int("txs", len(block.Transactions)))
for _, tx := range block.Transactions {
if err := p.processTx(ctx, tx, block.Header); err != nil {
return errors.Wrap(err, "failed to process tx")
}
}
if err := p.flushBlock(ctx, block.Header); err != nil {
return errors.Wrap(err, "failed to flush block")
}
logger.DebugContext(ctx, "Inserted new block")
}
return nil
}
@@ -66,13 +69,6 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
for runeId, balance := range balances {
unallocated[runeId] = unallocated[runeId].Add(balance.Amount)
p.newSpendOutPoints = append(p.newSpendOutPoints, balance.OutPoint)
logger.DebugContext(ctx, "[RunesProcessor] Found runes in tx input",
slogx.Any("runeId", runeId),
slogx.Stringer("amount", balance.Amount),
slogx.Stringer("txHash", balance.OutPoint.Hash),
slog.Int("txOutIndex", int(balance.OutPoint.Index)),
slog.Int("blockHeight", int(tx.BlockHeight)),
)
}
}
@@ -92,13 +88,6 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
}
allocated[output][runeId] = allocated[output][runeId].Add(amount)
unallocated[runeId] = unallocated[runeId].Sub(amount)
logger.DebugContext(ctx, "[RunesProcessor] Allocated runes to tx output",
slogx.Any("runeId", runeId),
slogx.Stringer("amount", amount),
slog.Int("output", output),
slogx.Stringer("txHash", tx.TxHash),
slog.Int("blockHeight", int(tx.BlockHeight)),
)
}
mints := make(map[runes.RuneId]uint128.Uint128)
@@ -131,7 +120,6 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
if !premine.IsZero() {
unallocated[etchedRuneId] = unallocated[etchedRuneId].Add(premine)
mints[etchedRuneId] = mints[etchedRuneId].Add(premine)
logger.DebugContext(ctx, "[RunesProcessor] Minted premine", slogx.Any("runeId", etchedRuneId), slogx.Stringer("amount", premine))
}
}
@@ -206,7 +194,6 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
// all input runes and minted runes in a tx with cenotaph are burned
for runeId, amount := range unallocated {
burns[runeId] = burns[runeId].Add(amount)
logger.DebugContext(ctx, "[RunesProcessor] Burned runes in cenotaph", slogx.Any("runeId", runeId), slogx.Stringer("amount", amount))
}
} else {
// assign all un-allocated runes to the default output (pointer), or the first non
@@ -236,7 +223,6 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
// if pointer is still nil, then no output is available. Burn all unallocated runes.
for runeId, amount := range unallocated {
burns[runeId] = burns[runeId].Add(amount)
logger.DebugContext(ctx, "[RunesProcessor] Burned runes to no output", slogx.Any("runeId", runeId), slogx.Stringer("amount", amount))
}
}
}
@@ -247,7 +233,6 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
// burn all allocated runes to OP_RETURN outputs
for runeId, amount := range balances {
burns[runeId] = burns[runeId].Add(amount)
logger.DebugContext(ctx, "[RunesProcessor] Burned runes to OP_RETURN output", slogx.Any("runeId", runeId), slogx.Stringer("amount", amount))
}
continue
}
@@ -316,7 +301,6 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
}
}
p.newRuneTxs = append(p.newRuneTxs, &runeTx)
logger.DebugContext(ctx, "[RunesProcessor] created RuneTransaction", slogx.Any("runeTx", runeTx))
return nil
}
@@ -413,7 +397,6 @@ func (p *Processor) mint(ctx context.Context, runeId runes.RuneId, blockHeader t
if err := p.incrementMintCount(ctx, runeId, blockHeader); err != nil {
return uint128.Zero, errors.Wrap(err, "failed to increment mint count")
}
logger.DebugContext(ctx, "[RunesProcessor] Minted rune", slogx.Any("runeId", runeId), slogx.Stringer("amount", amount), slogx.Stringer("mintCount", runeEntry.Mints), slogx.Stringer("cap", lo.FromPtr(runeEntry.Terms.Cap)))
return amount, nil
}
@@ -425,11 +408,9 @@ func (p *Processor) getEtchedRune(ctx context.Context, tx *types.Transaction, ru
if rune != nil {
minimumRune := runes.MinimumRuneAtHeight(p.network, uint64(tx.BlockHeight))
if rune.Cmp(minimumRune) < 0 {
logger.DebugContext(ctx, "invalid etching: rune is lower than minimum rune at this height", slogx.Any("rune", rune), slogx.Any("minimumRune", minimumRune))
return nil, runes.RuneId{}, runes.Rune{}, nil
}
if rune.IsReserved() {
logger.DebugContext(ctx, "invalid etching: rune is reserved", slogx.Any("rune", rune))
return nil, runes.RuneId{}, runes.Rune{}, nil
}
@@ -438,7 +419,6 @@ func (p *Processor) getEtchedRune(ctx context.Context, tx *types.Transaction, ru
return nil, runes.RuneId{}, runes.Rune{}, errors.Wrap(err, "error during check rune existence")
}
if ok {
logger.DebugContext(ctx, "invalid etching: rune already exists", slogx.Any("rune", rune))
return nil, runes.RuneId{}, runes.Rune{}, nil
}
@@ -448,7 +428,6 @@ func (p *Processor) getEtchedRune(ctx context.Context, tx *types.Transaction, ru
return nil, runes.RuneId{}, runes.Rune{}, errors.Wrap(err, "error during check tx commits to rune")
}
if !commit {
logger.DebugContext(ctx, "invalid etching: tx does not commit to the rune", slogx.Any("rune", rune))
return nil, runes.RuneId{}, runes.Rune{}, nil
}
} else {
@@ -464,7 +443,7 @@ func (p *Processor) getEtchedRune(ctx context.Context, tx *types.Transaction, ru
func (p *Processor) txCommitsToRune(ctx context.Context, tx *types.Transaction, rune runes.Rune) (bool, error) {
commitment := rune.Commitment()
for _, txIn := range tx.TxIn {
for i, txIn := range tx.TxIn {
tapscript, ok := extractTapScript(txIn.Witness)
if !ok {
continue
@@ -492,8 +471,7 @@ func (p *Processor) txCommitsToRune(ctx context.Context, tx *types.Transaction,
continue
}
if err != nil {
logger.ErrorContext(ctx, "failed to get pk script at out point", err)
continue
return false, errors.Wrapf(err, "can't get previous txout for txin `%v:%v`", tx.TxHash.String(), i)
}
pkScript := prevTx.TxOut[txIn.PreviousOutIndex].PkScript
// input utxo must be P2TR
@@ -576,7 +554,6 @@ func (p *Processor) createRuneEntry(ctx context.Context, runestone *runes.Runest
}
p.newRuneEntries[runeId] = runeEntry
p.newRuneEntryStates[runeId] = runeEntry
logger.DebugContext(ctx, "[RunesProcessor] created RuneEntry", slogx.Any("runeEntry", runeEntry))
return nil
}
@@ -630,7 +607,6 @@ func (p *Processor) incrementBurnedAmount(ctx context.Context, burned map[runes.
continue
}
runeEntry.BurnedAmount = runeEntry.BurnedAmount.Add(amount)
logger.DebugContext(ctx, "[RunesProcessor] burned amount incremented", slogx.Any("runeId", runeId), slogx.Any("amount", amount))
p.newRuneEntryStates[runeId] = runeEntry
}
return nil
@@ -698,7 +674,10 @@ func (p *Processor) flushBlock(ctx context.Context, blockHeader types.BlockHeade
}
defer func() {
if err := runesDgTx.Rollback(ctx); err != nil {
logger.ErrorContext(ctx, "[RunesProcessor] failed to rollback runes tx", err)
logger.WarnContext(ctx, "failed to rollback transaction",
slogx.Error(err),
slogx.String("event", "rollback_runes_insertion"),
)
}
}()
@@ -824,6 +803,5 @@ func (p *Processor) flushBlock(ctx context.Context, blockHeader types.BlockHeade
return errors.Wrap(err, "failed to submit block report")
}
}
logger.InfoContext(ctx, "[RunesProcessor] block flushed")
return nil
}

View File

@@ -55,7 +55,7 @@ func (r *Repository) Rollback(ctx context.Context) error {
return errors.Wrap(err, "failed to rollback transaction")
}
if err == nil {
logger.InfoContext(ctx, "rolled back transaction")
logger.DebugContext(ctx, "rolled back transaction")
}
r.tx = nil
return nil

View File

@@ -21,7 +21,11 @@ func NewHTTPErrorHandler() func(ctx *fiber.Ctx, err error) error {
return errors.WithStack(ctx.Status(e.Code).SendString(e.Error()))
}
logger.ErrorContext(ctx.UserContext(), "unhandled error", slogx.Error(err))
logger.ErrorContext(ctx.UserContext(), "Something went wrong, unhandled api error",
slogx.String("event", "api_unhandled_error"),
slogx.Error(err),
)
return errors.WithStack(ctx.Status(http.StatusInternalServerError).JSON(map[string]any{
"error": "Internal Server Error",
}))

View File

@@ -10,6 +10,7 @@ import (
"github.com/Cleverse/go-utilities/utils"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/valyala/fasthttp"
)
@@ -29,7 +30,7 @@ type Client struct {
func New(baseURL string, config ...Config) (*Client, error) {
if _, err := url.Parse(baseURL); err != nil {
return nil, errors.Wrap(err, "can't parse base url")
return nil, errors.Join(errs.InvalidArgument, errors.Wrap(err, "can't parse base url"))
}
var cf Config
if len(config) > 0 {
@@ -114,7 +115,7 @@ func (h *Client) request(ctx context.Context, reqOptions RequestOptions) (*HttpR
)
}
logger.Info("Finished make request")
logger.InfoContext(ctx, "Finished make request", slog.String("package", "httpclient"))
}
fasthttp.ReleaseResponse(resp)

15
pkg/logger/duration.go Normal file
View File

@@ -0,0 +1,15 @@
package logger
import (
"log/slog"
)
func durationToMsAttrReplacer(groups []string, attr slog.Attr) slog.Attr {
if attr.Value.Kind() == slog.KindDuration {
return slog.Attr{
Key: attr.Key,
Value: slog.Int64Value(attr.Value.Duration().Milliseconds()),
}
}
return attr
}

View File

@@ -7,12 +7,18 @@ import (
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
)
// NewGCPHandler returns a new GCP handler.
// The handler writes logs to the os.Stdout and
// replaces the default attribute keys/values with the GCP logging attribute keys/values
//
// https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry
func NewGCPHandler(opts *slog.HandlerOptions) slog.Handler {
return slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
AddSource: true,
Level: opts.Level,
ReplaceAttr: attrReplacerChain(
GCPAttrReplacer,
durationToMsAttrReplacer,
opts.ReplaceAttr,
),
})

View File

@@ -56,6 +56,8 @@ type SubmitBlockReportPayload struct {
}
func (r *ReportingClient) SubmitBlockReport(ctx context.Context, payload SubmitBlockReportPayload) error {
ctx = logger.WithContext(ctx, slog.String("package", "reporting_client"), slog.Any("payload", payload))
body, err := json.Marshal(payload)
if err != nil {
return errors.Wrap(err, "can't marshal payload")
@@ -67,9 +69,11 @@ func (r *ReportingClient) SubmitBlockReport(ctx context.Context, payload SubmitB
return errors.Wrap(err, "can't send request")
}
if resp.StatusCode() >= 400 {
logger.WarnContext(ctx, "failed to submit block report", slog.Any("payload", payload), slog.Any("responseBody", resp.Body()))
// TODO: unmashal response body and log it
logger.WarnContext(ctx, "Reporting block event failed", slog.Any("resp_body", resp.Body()))
return nil
}
logger.DebugContext(ctx, "block report submitted", slog.Any("payload", payload))
logger.DebugContext(ctx, "Reported block event")
return nil
}
@@ -89,6 +93,9 @@ func (r *ReportingClient) SubmitNodeReport(ctx context.Context, module string, n
WebsiteURL: r.config.WebsiteURL,
IndexerAPIURL: r.config.IndexerAPIURL,
}
ctx = logger.WithContext(ctx, slog.String("package", "reporting_client"), slog.Any("payload", payload))
body, err := json.Marshal(payload)
if err != nil {
return errors.Wrap(err, "can't marshal payload")
@@ -100,8 +107,8 @@ func (r *ReportingClient) SubmitNodeReport(ctx context.Context, module string, n
return errors.Wrap(err, "can't send request")
}
if resp.StatusCode() >= 400 {
logger.WarnContext(ctx, "failed to submit node report", slog.Any("payload", payload), slog.Any("responseBody", resp.Body()))
logger.WarnContext(ctx, "Reporting node info failed", slog.Any("resp_body", resp.Body()))
}
logger.InfoContext(ctx, "node report submitted", slog.Any("payload", payload))
logger.DebugContext(ctx, "Reported node info")
return nil
}