fix: optimize processing

This commit is contained in:
Gaze
2024-05-29 14:43:37 +07:00
parent bb03d439f5
commit db209f68ad
23 changed files with 634 additions and 142 deletions

View File

@@ -17,6 +17,7 @@ import (
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/indexer"
"github.com/gaze-network/indexer-network/internal/config"
"github.com/gaze-network/indexer-network/modules/brc20"
"github.com/gaze-network/indexer-network/modules/runes"
"github.com/gaze-network/indexer-network/pkg/automaxprocs"
"github.com/gaze-network/indexer-network/pkg/errorhandler"
@@ -34,6 +35,7 @@ import (
// Register Modules
var Modules = do.Package(
do.LazyNamed("runes", runes.New),
do.LazyNamed("brc20", brc20.New),
)
func NewRunCommand() *cobra.Command {

View File

@@ -8,6 +8,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common"
brc20config "github.com/gaze-network/indexer-network/modules/brc20/config"
runesconfig "github.com/gaze-network/indexer-network/modules/runes/config"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
@@ -60,6 +61,7 @@ type BitcoinNodeClient struct {
type Modules struct {
Runes runesconfig.Config `mapstructure:"runes"`
BRC20 brc20config.Config `mapstructure:"brc20"`
}
type HTTPServerConfig struct {

71
modules/brc20/brc20.go Normal file
View File

@@ -0,0 +1,71 @@
package brc20
import (
"context"
"strings"
"github.com/btcsuite/btcd/rpcclient"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/datasources"
"github.com/gaze-network/indexer-network/core/indexer"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/internal/config"
"github.com/gaze-network/indexer-network/internal/postgres"
"github.com/gaze-network/indexer-network/modules/brc20/internal/datagateway"
brc20postgres "github.com/gaze-network/indexer-network/modules/brc20/internal/repository/postgres"
"github.com/gaze-network/indexer-network/pkg/btcclient"
"github.com/pkg/errors"
"github.com/samber/do/v2"
)
func New(injector do.Injector) (indexer.IndexerWorker, error) {
ctx := do.MustInvoke[context.Context](injector)
conf := do.MustInvoke[config.Config](injector)
// reportingClient := do.MustInvoke[*reportingclient.ReportingClient](injector)
cleanupFuncs := make([]func(context.Context) error, 0)
var brc20Dg datagateway.BRC20DataGateway
var indexerInfoDg datagateway.IndexerInfoDataGateway
switch strings.ToLower(conf.Modules.BRC20.Database) {
case "postgresql", "postgres", "pg":
pg, err := postgres.NewPool(ctx, conf.Modules.BRC20.Postgres)
if err != nil {
if errors.Is(err, errs.InvalidArgument) {
return nil, errors.Wrap(err, "Invalid Postgres configuration for indexer")
}
return nil, errors.Wrap(err, "can't create Postgres connection pool")
}
cleanupFuncs = append(cleanupFuncs, func(ctx context.Context) error {
pg.Close()
return nil
})
brc20Repo := brc20postgres.NewRepository(pg)
brc20Dg = brc20Repo
indexerInfoDg = brc20Repo
default:
return nil, errors.Wrapf(errs.Unsupported, "%q database for indexer is not supported", conf.Modules.BRC20.Database)
}
var bitcoinDatasource datasources.Datasource[*types.Block]
var bitcoinClient btcclient.Contract
switch strings.ToLower(conf.Modules.BRC20.Datasource) {
case "bitcoin-node":
btcClient := do.MustInvoke[*rpcclient.Client](injector)
bitcoinNodeDatasource := datasources.NewBitcoinNode(btcClient)
bitcoinDatasource = bitcoinNodeDatasource
bitcoinClient = bitcoinNodeDatasource
default:
return nil, errors.Wrapf(errs.Unsupported, "%q datasource is not supported", conf.Modules.BRC20.Datasource)
}
processor, err := NewProcessor(brc20Dg, indexerInfoDg, bitcoinClient, conf.Network, 2, cleanupFuncs)
if err != nil {
return nil, errors.WithStack(err)
}
if err := processor.VerifyStates(ctx); err != nil {
return nil, errors.WithStack(err)
}
indexer := indexer.New(processor, bitcoinDatasource)
return indexer, nil
}

View File

@@ -0,0 +1,10 @@
package config
import "github.com/gaze-network/indexer-network/internal/postgres"
type Config struct {
Datasource string `mapstructure:"datasource"` // Datasource to fetch bitcoin data for Meta-Protocol e.g. `bitcoin-node`
Database string `mapstructure:"database"` // Database to store data.
APIHandlers []string `mapstructure:"api_handlers"` // List of API handlers to enable. (e.g. `http`)
Postgres postgres.Config `mapstructure:"postgres"`
}

View File

@@ -0,0 +1,27 @@
package brc20
import (
"github.com/Cleverse/go-utilities/utils"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/core/types"
)
const (
ClientVersion = "v0.0.1"
DBVersion = 1
EventHashVersion = 1
)
var startingBlockHeader = map[common.Network]types.BlockHeader{
common.NetworkMainnet: {
Height: 767430,
Hash: *utils.Must(chainhash.NewHashFromStr("00000000000000000003f079883a81997d3238b287ea53904f1ecd3d1f225209")),
// PrevBlock: *utils.Must(chainhash.NewHashFromStr("00000000000000000001dcce6ce7c8a45872cafd1fb04732b447a14a91832591")),
},
common.NetworkTestnet: {
Height: 2413343,
Hash: *utils.Must(chainhash.NewHashFromStr("00000000000022e97030b143af785de812f836dd0651b6ac2b7dd9e90dc9abf9")),
// PrevBlock: *utils.Must(chainhash.NewHashFromStr("00000000000668f3bafac992f53424774515440cb47e1cb9e73af3f496139e28")),
},
}

View File

@@ -1,14 +1,16 @@
BEGIN;
DROP TABLE IF EXISTS "brc20_indexer_stats";
DROP TABLE IF EXISTS "brc20_indexer_state";
DROP TABLE IF EXISTS "brc20_indexer_states";
DROP TABLE IF EXISTS "brc20_indexed_blocks";
DROP TABLE IF EXISTS "brc20_processor_stats";
DROP TABLE IF EXISTS "brc20_tickers";
DROP TABLE IF EXISTS "brc20_ticker_states";
DROP TABLE IF EXISTS "brc20_deploy_events";
DROP TABLE IF EXISTS "brc20_mint_events";
DROP TABLE IF EXISTS "brc20_transfer_events";
DROP TABLE IF EXISTS "brc20_balances";
DROP TABLE IF EXISTS "brc20_inscriptions";
DROP TABLE IF EXISTS "brc20_inscription_entries";
DROP TABLE IF EXISTS "brc20_inscription_entry_states";
DROP TABLE IF EXISTS "brc20_inscription_transfers";
COMMIT;

View File

@@ -2,27 +2,21 @@ BEGIN;
-- Indexer Client Information
CREATE TABLE IF NOT EXISTS "brc20_indexer_stats" (
CREATE TABLE IF NOT EXISTS "brc20_indexer_states" (
"id" BIGSERIAL PRIMARY KEY,
"client_version" TEXT NOT NULL,
"network" TEXT NOT NULL,
"created_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS "brc20_indexer_state" (
"id" BIGSERIAL PRIMARY KEY,
"db_version" INT NOT NULL,
"event_hash_version" INT NOT NULL,
"created_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS brc20_indexer_state_created_at_idx ON "brc20_indexer_state" USING BTREE ("created_at" DESC);
CREATE INDEX IF NOT EXISTS brc20_indexer_state_created_at_idx ON "brc20_indexer_states" USING BTREE ("created_at" DESC);
-- BRC20 data
CREATE TABLE IF NOT EXISTS "brc20_indexed_blocks" (
"height" INT NOT NULL PRIMARY KEY,
"hash" TEXT NOT NULL,
"prev_hash" TEXT NOT NULL,
"event_hash" TEXT NOT NULL,
"cumulative_event_hash" TEXT NOT NULL
);

View File

@@ -1,8 +1,19 @@
-- name: GetLatestProcessorStats :one
SELECT * FROM "brc20_processor_stats" ORDER BY block_height DESC LIMIT 1;
-- name: GetLatestIndexedBlock :one
SELECT * FROM "brc20_indexed_blocks" ORDER BY "height" DESC LIMIT 1;
-- name: GetInscriptionsInOutPoint :many
SELECT * FROM "brc20_inscription_transfers" WHERE "new_satpoint_tx_hash" = @tx_hash AND "new_satpoint_out_idx" = @tx_out_idx;
-- name: GetIndexedBlockByHeight :one
SELECT * FROM "brc20_indexed_blocks" WHERE "height" = $1;
-- name: GetLatestProcessorStats :one
SELECT * FROM "brc20_processor_stats" ORDER BY "block_height" DESC LIMIT 1;
-- name: GetInscriptionsInOutPoints :many
SELECT "brc20_inscription_transfers".* FROM (
SELECT
unnest(@tx_hash_arr::text[]) AS "tx_hash",
unnest(@tx_out_idx_arr::int[]) AS "tx_out_idx"
) "inputs"
INNER JOIN "brc20_inscription_transfers" ON "inputs"."tx_hash" = "brc20_inscription_transfers"."new_satpoint_tx_hash" AND "inputs"."tx_out_idx" = "brc20_inscription_transfers"."new_satpoint_out_idx";
-- name: GetInscriptionEntriesByIds :many
WITH "states" AS (
@@ -10,11 +21,14 @@ WITH "states" AS (
SELECT DISTINCT ON ("id") * FROM "brc20_inscription_entry_states" WHERE "id" = ANY(@inscription_ids::text[]) ORDER BY "id", "block_height" DESC
)
SELECT * FROM "brc20_inscription_entries"
LEFT JOIN "states" ON "brc20_inscription_entries"."id" = states."id"
LEFT JOIN "states" ON "brc20_inscription_entries"."id" = "states"."id"
WHERE "brc20_inscription_entries"."id" = ANY(@inscription_ids::text[]);
-- name: CreateIndexedBlock :exec
INSERT INTO "brc20_indexed_blocks" ("height", "hash", "event_hash", "cumulative_event_hash") VALUES ($1, $2, $3, $4);
-- name: CreateProcessorStats :exec
INSERT INTO "brc20_processor_stats" (block_height, cursed_inscription_count, blessed_inscription_count, lost_sats) VALUES ($1, $2, $3, $4);
INSERT INTO "brc20_processor_stats" ("block_height", "cursed_inscription_count", "blessed_inscription_count", "lost_sats") VALUES ($1, $2, $3, $4);
-- name: CreateInscriptionEntries :batchexec
INSERT INTO "brc20_inscription_entries" ("id", "number", "sequence_number", "delegate", "metadata", "metaprotocol", "parents", "pointer", "content", "content_encoding", "content_type", "cursed", "cursed_for_brc20", "created_at", "created_at_height") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15);

View File

@@ -1,11 +1,5 @@
-- name: GetLatestIndexerState :one
SELECT * FROM brc20_indexer_state ORDER BY created_at DESC LIMIT 1;
SELECT * FROM brc20_indexer_states ORDER BY created_at DESC LIMIT 1;
-- name: SetIndexerState :exec
INSERT INTO brc20_indexer_state (db_version, event_hash_version) VALUES ($1, $2);
-- name: GetLatestIndexerStats :one
SELECT "client_version", "network" FROM brc20_indexer_stats ORDER BY id DESC LIMIT 1;
-- name: UpdateIndexerStats :exec
INSERT INTO brc20_indexer_stats (client_version, network) VALUES ($1, $2);
-- name: CreateIndexerState :exec
INSERT INTO brc20_indexer_states (client_version, network, db_version, event_hash_version) VALUES ($1, $2, $3, $4);

View File

@@ -4,6 +4,7 @@ import (
"context"
"github.com/btcsuite/btcd/wire"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/modules/brc20/internal/entity"
"github.com/gaze-network/indexer-network/modules/brc20/internal/ordinals"
)
@@ -22,12 +23,15 @@ type BRC20DataGatewayWithTx interface {
}
type BRC20ReaderDataGateway interface {
GetLatestBlock(ctx context.Context) (types.BlockHeader, error)
GetIndexedBlockByHeight(ctx context.Context, height int64) (*entity.IndexedBlock, error)
GetProcessorStats(ctx context.Context) (*entity.ProcessorStats, error)
GetInscriptionIdsInOutPoint(ctx context.Context, outPoint wire.OutPoint) (map[ordinals.SatPoint][]ordinals.InscriptionId, error)
GetInscriptionIdsInOutPoints(ctx context.Context, outPoints []wire.OutPoint) (map[ordinals.SatPoint][]ordinals.InscriptionId, error)
GetInscriptionEntryById(ctx context.Context, id ordinals.InscriptionId) (*ordinals.InscriptionEntry, error)
}
type BRC20WriterDataGateway interface {
CreateIndexedBlock(ctx context.Context, block *entity.IndexedBlock) error
CreateProcessorStats(ctx context.Context, stats *entity.ProcessorStats) error
CreateInscriptionEntries(ctx context.Context, blockHeight uint64, entries []*ordinals.InscriptionEntry) error
CreateInscriptionEntryStates(ctx context.Context, blockHeight uint64, entryStates []*ordinals.InscriptionEntry) error

View File

@@ -0,0 +1,12 @@
package datagateway
import (
"context"
"github.com/gaze-network/indexer-network/modules/brc20/internal/entity"
)
type IndexerInfoDataGateway interface {
GetLatestIndexerState(ctx context.Context) (entity.IndexerState, error)
CreateIndexerState(ctx context.Context, state entity.IndexerState) error
}

View File

@@ -0,0 +1,10 @@
package entity
import "github.com/btcsuite/btcd/chaincfg/chainhash"
type IndexedBlock struct {
Height uint64
Hash chainhash.Hash
EventHash chainhash.Hash
CumulativeEventHash chainhash.Hash
}

View File

@@ -0,0 +1,15 @@
package entity
import (
"time"
"github.com/gaze-network/indexer-network/common"
)
type IndexerState struct {
CreatedAt time.Time
ClientVersion string
DBVersion int32
EventHashVersion int32
Network common.Network
}

View File

@@ -6,8 +6,6 @@ import (
"github.com/btcsuite/btcd/txscript"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/samber/lo"
)
@@ -58,7 +56,6 @@ func envelopesFromTapScript(tokenizer txscript.ScriptTokenizer, inputIndex int)
}
}
if tokenizer.Err() != nil {
logger.Warn("failed to parse tapscript", slogx.Error(tokenizer.Err()))
return envelopes
}
return envelopes

View File

@@ -7,17 +7,54 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/modules/brc20/internal/datagateway"
"github.com/gaze-network/indexer-network/modules/brc20/internal/entity"
"github.com/gaze-network/indexer-network/modules/brc20/internal/ordinals"
"github.com/gaze-network/indexer-network/modules/brc20/internal/repository/postgres/gen"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
"github.com/samber/lo"
)
var _ datagateway.BRC20DataGateway = (*Repository)(nil)
// warning: GetLatestBlock currently returns a types.BlockHeader with only Height and Hash fields populated.
// This is because it is known that all usage of this function only requires these fields. In the future, we may want to populate all fields for type safety.
func (r *Repository) GetLatestBlock(ctx context.Context) (types.BlockHeader, error) {
block, err := r.queries.GetLatestIndexedBlock(ctx)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return types.BlockHeader{}, errors.WithStack(errs.NotFound)
}
return types.BlockHeader{}, errors.Wrap(err, "error during query")
}
hash, err := chainhash.NewHashFromStr(block.Hash)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "failed to parse block hash")
}
return types.BlockHeader{
Height: int64(block.Height),
Hash: *hash,
}, nil
}
// GetIndexedBlockByHeight implements datagateway.BRC20DataGateway.
func (r *Repository) GetIndexedBlockByHeight(ctx context.Context, height int64) (*entity.IndexedBlock, error) {
model, err := r.queries.GetIndexedBlockByHeight(ctx, int32(height))
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, errors.WithStack(errs.NotFound)
}
return nil, errors.Wrap(err, "error during query")
}
indexedBlock, err := mapIndexedBlockModelToType(model)
if err != nil {
return nil, errors.Wrap(err, "failed to parse indexed block model")
}
return &indexedBlock, nil
}
func (r *Repository) GetProcessorStats(ctx context.Context) (*entity.ProcessorStats, error) {
model, err := r.queries.GetLatestProcessorStats(ctx)
if err != nil {
@@ -30,10 +67,16 @@ func (r *Repository) GetProcessorStats(ctx context.Context) (*entity.ProcessorSt
return &stats, nil
}
func (r *Repository) GetInscriptionIdsInOutPoint(ctx context.Context, outPoint wire.OutPoint) (map[ordinals.SatPoint][]ordinals.InscriptionId, error) {
models, err := r.queries.GetInscriptionsInOutPoint(ctx, gen.GetInscriptionsInOutPointParams{
TxHash: pgtype.Text{String: outPoint.Hash.String(), Valid: true},
TxOutIdx: pgtype.Int4{Int32: int32(outPoint.Index), Valid: true},
func (r *Repository) GetInscriptionIdsInOutPoints(ctx context.Context, outPoints []wire.OutPoint) (map[ordinals.SatPoint][]ordinals.InscriptionId, error) {
txHashArr := lo.Map(outPoints, func(outPoint wire.OutPoint, _ int) string {
return outPoint.Hash.String()
})
txOutIdxArr := lo.Map(outPoints, func(outPoint wire.OutPoint, _ int) int32 {
return int32(outPoint.Index)
})
models, err := r.queries.GetInscriptionsInOutPoints(ctx, gen.GetInscriptionsInOutPointsParams{
TxHashArr: txHashArr,
TxOutIdxArr: txOutIdxArr,
})
if err != nil {
return nil, errors.WithStack(err)
@@ -83,6 +126,14 @@ func (r *Repository) GetInscriptionEntryById(ctx context.Context, id ordinals.In
return &inscriptionEntry, nil
}
func (r *Repository) CreateIndexedBlock(ctx context.Context, block *entity.IndexedBlock) error {
params := mapIndexedBlockTypeToParams(*block)
if err := r.queries.CreateIndexedBlock(ctx, params); err != nil {
return errors.WithStack(err)
}
return nil
}
func (r *Repository) CreateProcessorStats(ctx context.Context, stats *entity.ProcessorStats) error {
params := mapProcessorStatsTypeToParams(*stats)
if err := r.queries.CreateProcessorStats(ctx, params); err != nil {

View File

@@ -11,8 +11,29 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)
const createIndexedBlock = `-- name: CreateIndexedBlock :exec
INSERT INTO "brc20_indexed_blocks" ("height", "hash", "event_hash", "cumulative_event_hash") VALUES ($1, $2, $3, $4)
`
type CreateIndexedBlockParams struct {
Height int32
Hash string
EventHash string
CumulativeEventHash string
}
func (q *Queries) CreateIndexedBlock(ctx context.Context, arg CreateIndexedBlockParams) error {
_, err := q.db.Exec(ctx, createIndexedBlock,
arg.Height,
arg.Hash,
arg.EventHash,
arg.CumulativeEventHash,
)
return err
}
const createProcessorStats = `-- name: CreateProcessorStats :exec
INSERT INTO "brc20_processor_stats" (block_height, cursed_inscription_count, blessed_inscription_count, lost_sats) VALUES ($1, $2, $3, $4)
INSERT INTO "brc20_processor_stats" ("block_height", "cursed_inscription_count", "blessed_inscription_count", "lost_sats") VALUES ($1, $2, $3, $4)
`
type CreateProcessorStatsParams struct {
@@ -32,13 +53,29 @@ func (q *Queries) CreateProcessorStats(ctx context.Context, arg CreateProcessorS
return err
}
const getIndexedBlockByHeight = `-- name: GetIndexedBlockByHeight :one
SELECT height, hash, event_hash, cumulative_event_hash FROM "brc20_indexed_blocks" WHERE "height" = $1
`
func (q *Queries) GetIndexedBlockByHeight(ctx context.Context, height int32) (Brc20IndexedBlock, error) {
row := q.db.QueryRow(ctx, getIndexedBlockByHeight, height)
var i Brc20IndexedBlock
err := row.Scan(
&i.Height,
&i.Hash,
&i.EventHash,
&i.CumulativeEventHash,
)
return i, err
}
const getInscriptionEntriesByIds = `-- name: GetInscriptionEntriesByIds :many
WITH "states" AS (
-- select latest state
SELECT DISTINCT ON ("id") id, block_height, transfer_count FROM "brc20_inscription_entry_states" WHERE "id" = ANY($1::text[]) ORDER BY "id", "block_height" DESC
)
SELECT brc20_inscription_entries.id, number, sequence_number, delegate, metadata, metaprotocol, parents, pointer, content, content_encoding, content_type, cursed, cursed_for_brc20, created_at, created_at_height, states.id, block_height, transfer_count FROM "brc20_inscription_entries"
LEFT JOIN "states" ON "brc20_inscription_entries"."id" = states."id"
LEFT JOIN "states" ON "brc20_inscription_entries"."id" = "states"."id"
WHERE "brc20_inscription_entries"."id" = ANY($1::text[])
`
@@ -102,17 +139,22 @@ func (q *Queries) GetInscriptionEntriesByIds(ctx context.Context, inscriptionIds
return items, nil
}
const getInscriptionsInOutPoint = `-- name: GetInscriptionsInOutPoint :many
SELECT inscription_id, block_height, old_satpoint_tx_hash, old_satpoint_out_idx, old_satpoint_offset, new_satpoint_tx_hash, new_satpoint_out_idx, new_satpoint_offset, new_pkscript, new_output_value, sent_as_fee FROM "brc20_inscription_transfers" WHERE "new_satpoint_tx_hash" = $1 AND "new_satpoint_out_idx" = $2
const getInscriptionsInOutPoints = `-- name: GetInscriptionsInOutPoints :many
SELECT brc20_inscription_transfers.inscription_id, brc20_inscription_transfers.block_height, brc20_inscription_transfers.old_satpoint_tx_hash, brc20_inscription_transfers.old_satpoint_out_idx, brc20_inscription_transfers.old_satpoint_offset, brc20_inscription_transfers.new_satpoint_tx_hash, brc20_inscription_transfers.new_satpoint_out_idx, brc20_inscription_transfers.new_satpoint_offset, brc20_inscription_transfers.new_pkscript, brc20_inscription_transfers.new_output_value, brc20_inscription_transfers.sent_as_fee FROM (
SELECT
unnest($1::text[]) AS "tx_hash",
unnest($2::int[]) AS "tx_out_idx"
) "inputs"
INNER JOIN "brc20_inscription_transfers" ON "inputs"."tx_hash" = "brc20_inscription_transfers"."new_satpoint_tx_hash" AND "inputs"."tx_out_idx" = "brc20_inscription_transfers"."new_satpoint_out_idx"
`
type GetInscriptionsInOutPointParams struct {
TxHash pgtype.Text
TxOutIdx pgtype.Int4
type GetInscriptionsInOutPointsParams struct {
TxHashArr []string
TxOutIdxArr []int32
}
func (q *Queries) GetInscriptionsInOutPoint(ctx context.Context, arg GetInscriptionsInOutPointParams) ([]Brc20InscriptionTransfer, error) {
rows, err := q.db.Query(ctx, getInscriptionsInOutPoint, arg.TxHash, arg.TxOutIdx)
func (q *Queries) GetInscriptionsInOutPoints(ctx context.Context, arg GetInscriptionsInOutPointsParams) ([]Brc20InscriptionTransfer, error) {
rows, err := q.db.Query(ctx, getInscriptionsInOutPoints, arg.TxHashArr, arg.TxOutIdxArr)
if err != nil {
return nil, err
}
@@ -143,8 +185,24 @@ func (q *Queries) GetInscriptionsInOutPoint(ctx context.Context, arg GetInscript
return items, nil
}
const getLatestIndexedBlock = `-- name: GetLatestIndexedBlock :one
SELECT height, hash, event_hash, cumulative_event_hash FROM "brc20_indexed_blocks" ORDER BY "height" DESC LIMIT 1
`
func (q *Queries) GetLatestIndexedBlock(ctx context.Context) (Brc20IndexedBlock, error) {
row := q.db.QueryRow(ctx, getLatestIndexedBlock)
var i Brc20IndexedBlock
err := row.Scan(
&i.Height,
&i.Hash,
&i.EventHash,
&i.CumulativeEventHash,
)
return i, err
}
const getLatestProcessorStats = `-- name: GetLatestProcessorStats :one
SELECT block_height, cursed_inscription_count, blessed_inscription_count, lost_sats FROM "brc20_processor_stats" ORDER BY block_height DESC LIMIT 1
SELECT block_height, cursed_inscription_count, blessed_inscription_count, lost_sats FROM "brc20_processor_stats" ORDER BY "block_height" DESC LIMIT 1
`
func (q *Queries) GetLatestProcessorStats(ctx context.Context) (Brc20ProcessorStat, error) {

View File

@@ -9,8 +9,29 @@ import (
"context"
)
const createIndexerState = `-- name: CreateIndexerState :exec
INSERT INTO brc20_indexer_states (client_version, network, db_version, event_hash_version) VALUES ($1, $2, $3, $4)
`
type CreateIndexerStateParams struct {
ClientVersion string
Network string
DbVersion int32
EventHashVersion int32
}
func (q *Queries) CreateIndexerState(ctx context.Context, arg CreateIndexerStateParams) error {
_, err := q.db.Exec(ctx, createIndexerState,
arg.ClientVersion,
arg.Network,
arg.DbVersion,
arg.EventHashVersion,
)
return err
}
const getLatestIndexerState = `-- name: GetLatestIndexerState :one
SELECT id, db_version, event_hash_version, created_at FROM brc20_indexer_state ORDER BY created_at DESC LIMIT 1
SELECT id, client_version, network, db_version, event_hash_version, created_at FROM brc20_indexer_states ORDER BY created_at DESC LIMIT 1
`
func (q *Queries) GetLatestIndexerState(ctx context.Context) (Brc20IndexerState, error) {
@@ -18,53 +39,11 @@ func (q *Queries) GetLatestIndexerState(ctx context.Context) (Brc20IndexerState,
var i Brc20IndexerState
err := row.Scan(
&i.Id,
&i.ClientVersion,
&i.Network,
&i.DbVersion,
&i.EventHashVersion,
&i.CreatedAt,
)
return i, err
}
const getLatestIndexerStats = `-- name: GetLatestIndexerStats :one
SELECT "client_version", "network" FROM brc20_indexer_stats ORDER BY id DESC LIMIT 1
`
type GetLatestIndexerStatsRow struct {
ClientVersion string
Network string
}
func (q *Queries) GetLatestIndexerStats(ctx context.Context) (GetLatestIndexerStatsRow, error) {
row := q.db.QueryRow(ctx, getLatestIndexerStats)
var i GetLatestIndexerStatsRow
err := row.Scan(&i.ClientVersion, &i.Network)
return i, err
}
const setIndexerState = `-- name: SetIndexerState :exec
INSERT INTO brc20_indexer_state (db_version, event_hash_version) VALUES ($1, $2)
`
type SetIndexerStateParams struct {
DbVersion int32
EventHashVersion int32
}
func (q *Queries) SetIndexerState(ctx context.Context, arg SetIndexerStateParams) error {
_, err := q.db.Exec(ctx, setIndexerState, arg.DbVersion, arg.EventHashVersion)
return err
}
const updateIndexerStats = `-- name: UpdateIndexerStats :exec
INSERT INTO brc20_indexer_stats (client_version, network) VALUES ($1, $2)
`
type UpdateIndexerStatsParams struct {
ClientVersion string
Network string
}
func (q *Queries) UpdateIndexerStats(ctx context.Context, arg UpdateIndexerStatsParams) error {
_, err := q.db.Exec(ctx, updateIndexerStats, arg.ClientVersion, arg.Network)
return err
}

View File

@@ -35,20 +35,14 @@ type Brc20DeployEvent struct {
type Brc20IndexedBlock struct {
Height int32
Hash string
PrevHash string
EventHash string
CumulativeEventHash string
}
type Brc20IndexerStat struct {
Id int64
ClientVersion string
Network string
CreatedAt pgtype.Timestamptz
}
type Brc20IndexerState struct {
Id int64
ClientVersion string
Network string
DbVersion int32
EventHashVersion int32
CreatedAt pgtype.Timestamptz

View File

@@ -0,0 +1,33 @@
package postgres
import (
"context"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/modules/brc20/internal/datagateway"
"github.com/gaze-network/indexer-network/modules/brc20/internal/entity"
"github.com/jackc/pgx/v5"
)
var _ datagateway.IndexerInfoDataGateway = (*Repository)(nil)
func (r *Repository) GetLatestIndexerState(ctx context.Context) (entity.IndexerState, error) {
model, err := r.queries.GetLatestIndexerState(ctx)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return entity.IndexerState{}, errors.WithStack(errs.NotFound)
}
return entity.IndexerState{}, errors.Wrap(err, "error during query")
}
state := mapIndexerStatesModelToType(model)
return state, nil
}
func (r *Repository) CreateIndexerState(ctx context.Context, state entity.IndexerState) error {
params := mapIndexerStatesTypeToParams(state)
if err := r.queries.CreateIndexerState(ctx, params); err != nil {
return errors.Wrap(err, "error during exec")
}
return nil
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/modules/brc20/internal/entity"
"github.com/gaze-network/indexer-network/modules/brc20/internal/ordinals"
"github.com/gaze-network/indexer-network/modules/brc20/internal/repository/postgres/gen"
@@ -14,6 +15,59 @@ import (
"github.com/samber/lo"
)
func mapIndexerStatesModelToType(src gen.Brc20IndexerState) entity.IndexerState {
var createdAt time.Time
if src.CreatedAt.Valid {
createdAt = src.CreatedAt.Time
}
return entity.IndexerState{
ClientVersion: src.ClientVersion,
Network: common.Network(src.Network),
DBVersion: int32(src.DbVersion),
EventHashVersion: int32(src.EventHashVersion),
CreatedAt: createdAt,
}
}
func mapIndexerStatesTypeToParams(src entity.IndexerState) gen.CreateIndexerStateParams {
return gen.CreateIndexerStateParams{
ClientVersion: src.ClientVersion,
Network: string(src.Network),
DbVersion: int32(src.DBVersion),
EventHashVersion: int32(src.EventHashVersion),
}
}
func mapIndexedBlockModelToType(src gen.Brc20IndexedBlock) (entity.IndexedBlock, error) {
hash, err := chainhash.NewHashFromStr(src.Hash)
if err != nil {
return entity.IndexedBlock{}, errors.Wrap(err, "invalid block hash")
}
eventHash, err := chainhash.NewHashFromStr(src.EventHash)
if err != nil {
return entity.IndexedBlock{}, errors.Wrap(err, "invalid event hash")
}
cumulativeEventHash, err := chainhash.NewHashFromStr(src.CumulativeEventHash)
if err != nil {
return entity.IndexedBlock{}, errors.Wrap(err, "invalid cumulative event hash")
}
return entity.IndexedBlock{
Height: uint64(src.Height),
Hash: *hash,
EventHash: *eventHash,
CumulativeEventHash: *cumulativeEventHash,
}, nil
}
func mapIndexedBlockTypeToParams(src entity.IndexedBlock) gen.CreateIndexedBlockParams {
return gen.CreateIndexedBlockParams{
Height: int32(src.Height),
Hash: src.Hash.String(),
EventHash: src.EventHash.String(),
CumulativeEventHash: src.CumulativeEventHash.String(),
}
}
func mapProcessorStatsModelToType(src gen.Brc20ProcessorStat) entity.ProcessorStats {
return entity.ProcessorStats{
BlockHeight: uint64(src.BlockHeight),

View File

@@ -6,6 +6,7 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/indexer"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/modules/brc20/internal/datagateway"
@@ -20,9 +21,11 @@ var _ indexer.Processor[*types.Block] = (*Processor)(nil)
type Processor struct {
brc20Dg datagateway.BRC20DataGateway
indexerInfoDg datagateway.IndexerInfoDataGateway
btcClient btcclient.Contract
network common.Network
transferCountLimit uint32 // number of transfers to track per inscription
cleanupFuncs []func(context.Context) error
// block states
flotsamsSentAsFee []*Flotsam
@@ -45,28 +48,26 @@ type Processor struct {
// TODO: move this to config
const outPointValueCacheSize = 100000
func NewProcessor(ctx context.Context, brc20Dg datagateway.BRC20DataGateway, btcClient btcclient.Contract, network common.Network, transferCountLimit uint32) (*Processor, error) {
func NewProcessor(brc20Dg datagateway.BRC20DataGateway, indexerInfoDg datagateway.IndexerInfoDataGateway, btcClient btcclient.Contract, network common.Network, transferCountLimit uint32, cleanupFuncs []func(context.Context) error) (*Processor, error) {
outPointValueCache, err := lru.New[wire.OutPoint, uint64](outPointValueCacheSize)
if err != nil {
return nil, errors.Wrap(err, "failed to create outPointValueCache")
}
stats, err := brc20Dg.GetProcessorStats(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to count cursed inscriptions")
}
return &Processor{
brc20Dg: brc20Dg,
indexerInfoDg: indexerInfoDg,
btcClient: btcClient,
network: network,
transferCountLimit: transferCountLimit,
cleanupFuncs: cleanupFuncs,
flotsamsSentAsFee: make([]*Flotsam, 0),
blockReward: 0,
cursedInscriptionCount: stats.CursedInscriptionCount,
blessedInscriptionCount: stats.BlessedInscriptionCount,
lostSats: stats.LostSats,
cursedInscriptionCount: 0, // to be initialized by p.VerifyStates()
blessedInscriptionCount: 0, // to be initialized by p.VerifyStates()
lostSats: 0, // to be initialized by p.VerifyStates()
outPointValueCache: outPointValueCache,
newInscriptionTransfers: make([]*entity.InscriptionTransfer, 0),
@@ -77,25 +78,90 @@ func NewProcessor(ctx context.Context, brc20Dg datagateway.BRC20DataGateway, btc
// VerifyStates implements indexer.Processor.
func (p *Processor) VerifyStates(ctx context.Context) error {
panic("unimplemented")
indexerState, err := p.indexerInfoDg.GetLatestIndexerState(ctx)
if err != nil && !errors.Is(err, errs.NotFound) {
return errors.Wrap(err, "failed to get latest indexer state")
}
// if not found, create indexer state
if errors.Is(err, errs.NotFound) {
if err := p.indexerInfoDg.CreateIndexerState(ctx, entity.IndexerState{
ClientVersion: ClientVersion,
DBVersion: DBVersion,
EventHashVersion: EventHashVersion,
Network: p.network,
}); err != nil {
return errors.Wrap(err, "failed to set indexer state")
}
} else {
if indexerState.DBVersion != DBVersion {
return errors.Wrapf(errs.ConflictSetting, "db version mismatch: current version is %d. Please upgrade to version %d", indexerState.DBVersion, DBVersion)
}
if indexerState.EventHashVersion != EventHashVersion {
return errors.Wrapf(errs.ConflictSetting, "event version mismatch: current version is %d. Please reset rune's db first.", indexerState.EventHashVersion, EventHashVersion)
}
if indexerState.Network != p.network {
return errors.Wrapf(errs.ConflictSetting, "network mismatch: latest indexed network is %d, configured network is %d. If you want to change the network, please reset the database", indexerState.Network, p.network)
}
}
stats, err := p.brc20Dg.GetProcessorStats(ctx)
if err != nil {
if !errors.Is(err, errs.NotFound) {
return errors.Wrap(err, "failed to count cursed inscriptions")
}
stats = &entity.ProcessorStats{
BlockHeight: uint64(startingBlockHeader[p.network].Height),
CursedInscriptionCount: 0,
BlessedInscriptionCount: 0,
LostSats: 0,
}
}
p.cursedInscriptionCount = stats.CursedInscriptionCount
p.blessedInscriptionCount = stats.BlessedInscriptionCount
p.lostSats = stats.LostSats
return nil
}
// CurrentBlock implements indexer.Processor.
func (p *Processor) CurrentBlock(ctx context.Context) (types.BlockHeader, error) {
panic("unimplemented")
blockHeader, err := p.brc20Dg.GetLatestBlock(ctx)
if err != nil {
if errors.Is(err, errs.NotFound) {
return startingBlockHeader[p.network], nil
}
return types.BlockHeader{}, errors.Wrap(err, "failed to get latest block")
}
return blockHeader, nil
}
// GetIndexedBlock implements indexer.Processor.
func (p *Processor) GetIndexedBlock(ctx context.Context, height int64) (types.BlockHeader, error) {
panic("unimplemented")
block, err := p.brc20Dg.GetIndexedBlockByHeight(ctx, height)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "failed to get indexed block")
}
return types.BlockHeader{
Height: int64(block.Height),
Hash: block.Hash,
}, nil
}
// Name implements indexer.Processor.
func (p *Processor) Name() string {
panic("unimplemented")
return "brc20"
}
// RevertData implements indexer.Processor.
func (p *Processor) RevertData(ctx context.Context, from int64) error {
panic("unimplemented")
}
func (p *Processor) Shutdown(ctx context.Context) error {
var errs []error
for _, cleanup := range p.cleanupFuncs {
if err := cleanup(ctx); err != nil {
errs = append(errs, err)
}
}
return errors.WithStack(errors.Join(errs...))
}

View File

@@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"slices"
"strings"
"github.com/btcsuite/btcd/blockchain"
"github.com/btcsuite/btcd/chaincfg/chainhash"
@@ -14,12 +13,34 @@ import (
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/modules/brc20/internal/entity"
"github.com/gaze-network/indexer-network/modules/brc20/internal/ordinals"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/samber/lo"
)
func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockHeader types.BlockHeader) error {
floatingInscriptions := make([]*Flotsam, 0)
func (p *Processor) processInscriptionTx(ctx context.Context, tx *types.Transaction, blockHeader types.BlockHeader) error {
ctx = logger.WithContext(ctx, slogx.String("tx_hash", tx.TxHash.String()))
envelopes := ordinals.ParseEnvelopesFromTx(tx)
inputOutPoints := lo.Map(tx.TxIn, func(txIn *types.TxIn, _ int) wire.OutPoint {
return wire.OutPoint{
Hash: txIn.PreviousOutTxHash,
Index: txIn.PreviousOutIndex,
}
})
inscriptionIdsInOutPoints, err := p.getInscriptionIdsInOutPoints(ctx, inputOutPoints)
if err != nil {
return errors.Wrap(err, "failed to get inscriptions in outpoints")
}
if len(envelopes) == 0 && len(inscriptionIdsInOutPoints) == 0 {
// no inscription activity, skip
return nil
}
logger.DebugContext(ctx, "Processing new tx",
slogx.String("tx_hash", tx.TxHash.String()),
slogx.Uint32("tx_index", tx.Index),
)
floatingInscriptions := make([]*Flotsam, 0)
totalInputValue := uint64(0)
totalOutputValue := lo.SumBy(tx.TxOut, func(txOut *types.TxOut) uint64 { return uint64(txOut.Value) })
inscribeOffsets := make(map[uint64]*struct {
@@ -27,12 +48,7 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
count int
})
idCounter := uint32(0)
inputValues, err := p.getOutPointValues(ctx, lo.Map(tx.TxIn, func(txIn *types.TxIn, _ int) wire.OutPoint {
return wire.OutPoint{
Hash: txIn.PreviousOutTxHash,
Index: txIn.PreviousOutIndex,
}
}))
inputValues, err := p.getOutPointValues(ctx, inputOutPoints)
if err != nil {
return errors.Wrap(err, "failed to get outpoint values")
}
@@ -48,11 +64,8 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
continue
}
inscriptions, err := p.getInscriptionIdsInOutPoint(ctx, inputOutPoint)
if err != nil {
return errors.Wrap(err, "failed to get inscriptions in outpoint")
}
for satPoint, inscriptionIds := range inscriptions {
inscriptionIdsInOutPoint := inscriptionIdsInOutPoints[inputOutPoint]
for satPoint, inscriptionIds := range inscriptionIdsInOutPoint {
offset := totalInputValue + satPoint.Offset
for _, inscriptionId := range inscriptionIds {
floatingInscriptions = append(floatingInscriptions, &Flotsam{
@@ -344,19 +357,6 @@ func (p *Processor) updateInscriptionLocation(ctx context.Context, newSatPoint o
}
func isBRC20Inscription(inscription ordinals.Inscription) bool {
// check content type
if inscription.ContentType != "" {
contentType := inscription.ContentType
if lo.Contains([]string{"text/plain", "application/json"}, contentType) {
return true
}
prefixes := []string{"text/plain;", "application/json;"}
for _, prefix := range prefixes {
if strings.HasPrefix(contentType, prefix) {
return true
}
}
}
// attempt to parse content as json
if inscription.Content != nil {
var parsed interface{}
@@ -406,12 +406,19 @@ func (p *Processor) getOutPointValues(ctx context.Context, outPoints []wire.OutP
return outPointValues, nil
}
func (p *Processor) getInscriptionIdsInOutPoint(ctx context.Context, outPoint wire.OutPoint) (map[ordinals.SatPoint][]ordinals.InscriptionId, error) {
inscriptions, err := p.brc20Dg.GetInscriptionIdsInOutPoint(ctx, outPoint)
func (p *Processor) getInscriptionIdsInOutPoints(ctx context.Context, outPoints []wire.OutPoint) (map[wire.OutPoint]map[ordinals.SatPoint][]ordinals.InscriptionId, error) {
inscriptionIds, err := p.brc20Dg.GetInscriptionIdsInOutPoints(ctx, outPoints)
if err != nil {
return nil, errors.Wrap(err, "failed to get inscriptions by outpoint")
}
return inscriptions, nil
result := make(map[wire.OutPoint]map[ordinals.SatPoint][]ordinals.InscriptionId)
for satPoint, inscriptionIds := range inscriptionIds {
if _, ok := result[satPoint.OutPoint]; !ok {
result[satPoint.OutPoint] = make(map[ordinals.SatPoint][]ordinals.InscriptionId)
}
result[satPoint.OutPoint][satPoint] = append(result[satPoint.OutPoint][satPoint], inscriptionIds...)
}
return result, nil
}
func (p *Processor) getInscriptionEntryById(ctx context.Context, inscriptionId ordinals.InscriptionId) (*ordinals.InscriptionEntry, error) {

View File

@@ -3,10 +3,106 @@ package brc20
import (
"context"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/modules/brc20/internal/entity"
"github.com/gaze-network/indexer-network/modules/brc20/internal/ordinals"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
"github.com/pkg/errors"
"github.com/samber/lo"
)
// Process implements indexer.Processor.
func (p *Processor) Process(ctx context.Context, inputs []*types.Block) error {
panic("unimplemented")
func (p *Processor) Process(ctx context.Context, blocks []*types.Block) error {
for _, block := range blocks {
ctx = logger.WithContext(ctx, slogx.Uint64("height", uint64(block.Header.Height)))
logger.DebugContext(ctx, "Processing new block")
p.blockReward = p.getBlockSubsidy(uint64(block.Header.Height))
p.flotsamsSentAsFee = make([]*Flotsam, 0)
for _, tx := range block.Transactions {
if err := p.processInscriptionTx(ctx, tx, block.Header); err != nil {
return errors.Wrap(err, "failed to process tx")
}
}
// TODO: add brc20 processing
if err := p.flushBlock(ctx, block.Header); err != nil {
return errors.Wrap(err, "failed to flush block")
}
logger.DebugContext(ctx, "Inserted new block")
}
return nil
}
func (p *Processor) flushBlock(ctx context.Context, blockHeader types.BlockHeader) error {
brc20DgTx, err := p.brc20Dg.BeginBRC20Tx(ctx)
if err != nil {
return errors.Wrap(err, "failed to begin transaction")
}
defer func() {
if err := brc20DgTx.Rollback(ctx); err != nil {
logger.WarnContext(ctx, "failed to rollback transaction",
slogx.Error(err),
slogx.String("event", "rollback_brc20_insertion"),
)
}
}()
blockHeight := uint64(blockHeader.Height)
// CreateIndexedBlock must be performed before other flush methods to correctly calculate event hash
// TODO: calculate event hash
if err := brc20DgTx.CreateIndexedBlock(ctx, &entity.IndexedBlock{
Height: blockHeight,
Hash: blockHeader.Hash,
EventHash: chainhash.Hash{},
CumulativeEventHash: chainhash.Hash{},
}); err != nil {
return errors.Wrap(err, "failed to create indexed block")
}
// flush new inscription entries
{
newInscriptionEntries := lo.Values(p.newInscriptionEntries)
if err := brc20DgTx.CreateInscriptionEntries(ctx, blockHeight, newInscriptionEntries); err != nil {
return errors.Wrap(err, "failed to create inscription entries")
}
p.newInscriptionEntries = make(map[ordinals.InscriptionId]*ordinals.InscriptionEntry)
}
// flush new inscription entry states
{
newInscriptionEntryStates := lo.Values(p.newInscriptionEntryStates)
if err := brc20DgTx.CreateInscriptionEntryStates(ctx, blockHeight, newInscriptionEntryStates); err != nil {
return errors.Wrap(err, "failed to create inscription entry states")
}
p.newInscriptionEntryStates = make(map[ordinals.InscriptionId]*ordinals.InscriptionEntry)
}
// flush new inscription entry states
{
if err := brc20DgTx.CreateInscriptionTransfers(ctx, p.newInscriptionTransfers); err != nil {
return errors.Wrap(err, "failed to create inscription transfers")
}
p.newInscriptionTransfers = make([]*entity.InscriptionTransfer, 0)
}
// flush processor stats
{
stats := &entity.ProcessorStats{
BlockHeight: blockHeight,
CursedInscriptionCount: p.cursedInscriptionCount,
BlessedInscriptionCount: p.blessedInscriptionCount,
LostSats: p.lostSats,
}
if err := brc20DgTx.CreateProcessorStats(ctx, stats); err != nil {
return errors.Wrap(err, "failed to create processor stats")
}
}
if err := brc20DgTx.Commit(ctx); err != nil {
return errors.Wrap(err, "failed to commit transaction")
}
return nil
}