feat: flush new entry states and indexed blocks

This commit is contained in:
Gaze
2024-04-13 01:59:54 +07:00
parent 0f1ec2a21e
commit 901946af38
5 changed files with 93 additions and 28 deletions

View File

@@ -46,7 +46,8 @@ type RunesWriterDataGateway interface {
// Rollback() must be safe to call even if no transaction is active. Hence, a defer Rollback() is safe, even if Commit() was called prior with non-error conditions.
Rollback(ctx context.Context) error
CreateRuneEntry(ctx context.Context, entry *runes.RuneEntry) error
CreateRuneEntry(ctx context.Context, entry *runes.RuneEntry, blockHeight uint64) error
CreateRuneEntryState(ctx context.Context, entry *runes.RuneEntry, blockHeight uint64) error
CreateRuneBalancesAtOutPoint(ctx context.Context, outPoint wire.OutPoint, balances map[runes.RuneId]uint128.Uint128) error
CreateRuneBalancesAtBlock(ctx context.Context, params []CreateRuneBalancesAtBlockParams) error
UpdateLatestBlock(ctx context.Context, blockHeader types.BlockHeader) error

View File

@@ -115,7 +115,7 @@ func mapRuneEntryModelToType(src gen.GetRuneEntriesByRuneIdsRow) (runes.RuneEntr
}, nil
}
func mapRuneEntryTypeToParams(src runes.RuneEntry) (gen.CreateRuneEntryParams, gen.CreateRuneEntryStateParams, error) {
func mapRuneEntryTypeToParams(src runes.RuneEntry, blockHeight uint64) (gen.CreateRuneEntryParams, gen.CreateRuneEntryStateParams, error) {
runeId := src.RuneId.String()
rune := src.SpacedRune.Rune.String()
spacers := int32(src.SpacedRune.Spacers)
@@ -194,6 +194,7 @@ func mapRuneEntryTypeToParams(src runes.RuneEntry) (gen.CreateRuneEntryParams, g
TermsOffsetStart: termsOffsetStart,
TermsOffsetEnd: termsOffsetEnd,
}, gen.CreateRuneEntryStateParams{
BlockHeight: int32(blockHeight),
RuneID: runeId,
Mints: mints,
BurnedAmount: burnedAmount,

View File

@@ -133,7 +133,7 @@ func (r *Repository) GetRuneEntryByRuneId(ctx context.Context, runeId runes.Rune
}
func (r *Repository) GetRuneEntryByRuneIdBatch(ctx context.Context, runeIds []runes.RuneId) (map[runes.RuneId]*runes.RuneEntry, error) {
runeEntryModels, err := r.queries.GetRuneEntriesByRuneIds(ctx, lo.Map(runeIds, func(runeId runes.RuneId, _ int) string {
rows, err := r.queries.GetRuneEntriesByRuneIds(ctx, lo.Map(runeIds, func(runeId runes.RuneId, _ int) string {
return runeId.String()
}))
if err != nil {
@@ -143,9 +143,9 @@ func (r *Repository) GetRuneEntryByRuneIdBatch(ctx context.Context, runeIds []ru
return nil, errors.Wrap(err, "error during query")
}
runeEntries := make(map[runes.RuneId]*runes.RuneEntry, len(runeEntryModels))
runeEntries := make(map[runes.RuneId]*runes.RuneEntry, len(rows))
var errs []error
for i, runeEntryModel := range runeEntryModels {
for i, runeEntryModel := range rows {
runeEntry, err := mapRuneEntryModelToType(runeEntryModel)
if err != nil {
errs = append(errs, errors.Wrapf(err, "failed to parse rune entry model index %d", i))
@@ -179,11 +179,11 @@ func (r *Repository) CreateRuneTransaction(ctx context.Context, tx *entity.RuneT
return nil
}
func (r *Repository) CreateRuneEntry(ctx context.Context, entry *runes.RuneEntry) error {
func (r *Repository) CreateRuneEntry(ctx context.Context, entry *runes.RuneEntry, blockHeight uint64) error {
if entry == nil {
return nil
}
createParams, createStateParams, err := mapRuneEntryTypeToParams(*entry)
createParams, createStateParams, err := mapRuneEntryTypeToParams(*entry, blockHeight)
if err != nil {
return errors.Wrap(err, "failed to map rune entry to params")
}
@@ -196,11 +196,11 @@ func (r *Repository) CreateRuneEntry(ctx context.Context, entry *runes.RuneEntry
return nil
}
func (r *Repository) CreateRuneEntryState(ctx context.Context, entry *runes.RuneEntry) error {
func (r *Repository) CreateRuneEntryState(ctx context.Context, entry *runes.RuneEntry, blockHeight uint64) error {
if entry == nil {
return nil
}
_, createStateParams, err := mapRuneEntryTypeToParams(*entry)
_, createStateParams, err := mapRuneEntryTypeToParams(*entry, blockHeight)
if err != nil {
return errors.Wrap(err, "failed to map rune entry to params")
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/modules/bitcoin/btcclient"
"github.com/gaze-network/indexer-network/modules/runes/internal/datagateway"
"github.com/gaze-network/indexer-network/modules/runes/internal/runes"
)
var _ indexers.BitcoinProcessor = (*Processor)(nil)
@@ -18,6 +19,8 @@ type Processor struct {
bitcoinClient btcclient.Contract
bitcoinDataSource indexers.BitcoinDatasource
network common.Network
newRuneEntryStates map[runes.RuneId]*runes.RuneEntry
}
type NewProcessorParams struct {
@@ -29,10 +32,11 @@ type NewProcessorParams struct {
func NewProcessor(params NewProcessorParams) *Processor {
return &Processor{
runesDg: params.RunesDg,
bitcoinClient: params.BitcoinClient,
bitcoinDataSource: params.BitcoinDataSource,
network: params.Network,
runesDg: params.RunesDg,
bitcoinClient: params.BitcoinClient,
bitcoinDataSource: params.BitcoinDataSource,
network: params.Network,
newRuneEntryStates: make(map[runes.RuneId]*runes.RuneEntry),
}
}

View File

@@ -5,11 +5,13 @@ import (
"context"
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/txscript"
"github.com/btcsuite/btcd/wire"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/modules/runes/internal/entity"
"github.com/gaze-network/indexer-network/modules/runes/internal/runes"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/uint128"
@@ -33,7 +35,12 @@ func (p *Processor) Process(ctx context.Context, blocks []*types.Block) error {
return errors.Wrap(err, "failed to process tx")
}
}
// TODO: create indexed block in db
if err := p.flushNewRuneEntryStates(ctx, uint64(block.Header.Height)); err != nil {
return errors.Wrap(err, "failed to flush new rune entry states")
}
if err := p.createIndexedBlock(ctx, block.Header); err != nil {
return errors.Wrap(err, "failed to create indexed block")
}
}
if err := p.runesDg.Commit(ctx); err != nil {
return errors.Wrap(err, "failed to commit transaction")
@@ -152,7 +159,7 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
}
if etching != nil {
if err := p.createRuneEntry(ctx, runestone, etchedRuneId, etchedRune); err != nil {
if err := p.createRuneEntry(ctx, runestone, etchedRuneId, etchedRune, uint64(tx.BlockHeight)); err != nil {
return errors.Wrap(err, "failed to create rune entry")
}
}
@@ -215,7 +222,7 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
}
// increment burned amounts in rune entries
if err := p.updatedBurnedAmount(ctx, burned); err != nil {
if err := p.incrementBurnedAmount(ctx, burned); err != nil {
return errors.Wrap(err, "failed to update burned amount")
}
return nil
@@ -252,9 +259,8 @@ func (p *Processor) mint(ctx context.Context, runeId runes.RuneId, height uint64
return uint128.Zero, nil
}
runeEntry.Mints = runeEntry.Mints.Add64(1)
if err := p.runesDg.SetRuneEntry(ctx, runeEntry); err != nil {
return uint128.Uint128{}, errors.Wrap(err, "failed to set rune entry")
if err := p.incrementMintCount(ctx, runeId, 1); err != nil {
return uint128.Zero, errors.Wrap(err, "failed to increment mint count")
}
return amount, nil
}
@@ -360,7 +366,7 @@ func removeAnnexFromWitness(witness [][]byte) [][]byte {
return witness
}
func (p *Processor) createRuneEntry(ctx context.Context, runestone *runes.Runestone, runeId runes.RuneId, rune runes.Rune) error {
func (p *Processor) createRuneEntry(ctx context.Context, runestone *runes.Runestone, runeId runes.RuneId, rune runes.Rune, blockHeight uint64) error {
var runeEntry *runes.RuneEntry
if runestone.Cenotaph {
runeEntry = &runes.RuneEntry{
@@ -388,26 +394,79 @@ func (p *Processor) createRuneEntry(ctx context.Context, runestone *runes.Runest
CompletionTime: time.Time{},
}
}
if err := p.runesDg.SetRuneEntry(ctx, runeEntry); err != nil {
if err := p.runesDg.CreateRuneEntry(ctx, runeEntry, blockHeight); err != nil {
return errors.Wrap(err, "failed to set rune entry")
}
return nil
}
func (p *Processor) updatedBurnedAmount(ctx context.Context, burned map[runes.RuneId]uint128.Uint128) error {
runeEntries, err := p.runesDg.GetRuneEntryByRuneIdBatch(ctx, lo.Keys(burned))
if err != nil {
return errors.Wrap(err, "failed to get rune entry by rune id batch")
func (p *Processor) incrementMintCount(ctx context.Context, runeId runes.RuneId, count uint64) (err error) {
runeEntry, ok := p.newRuneEntryStates[runeId]
if !ok {
runeEntry, err = p.runesDg.GetRuneEntryByRuneId(ctx, runeId)
if err != nil {
return errors.Wrap(err, "failed to get rune entry by rune id")
}
}
runeEntry.Mints = runeEntry.Mints.Add64(count)
p.newRuneEntryStates[runeId] = runeEntry
return nil
}
func (p *Processor) incrementBurnedAmount(ctx context.Context, burned map[runes.RuneId]uint128.Uint128) (err error) {
runeEntries := make(map[runes.RuneId]*runes.RuneEntry)
fetchRuneIds := make([]runes.RuneId, 0)
for runeId := range burned {
runeEntry, ok := p.newRuneEntryStates[runeId]
if !ok {
fetchRuneIds = append(fetchRuneIds, runeId)
} else {
runeEntries[runeId] = runeEntry
}
}
if len(fetchRuneIds) > 0 {
entries, err := p.runesDg.GetRuneEntryByRuneIdBatch(ctx, fetchRuneIds)
if err != nil {
return errors.Wrap(err, "failed to get rune entry by rune id batch")
}
if len(entries) != len(fetchRuneIds) {
// there are missing entries, db may be corrupted
return errors.Errorf("missing rune entries: expected %d, got %d", len(fetchRuneIds), len(entries))
}
for runeId, entry := range entries {
runeEntries[runeId] = entry
}
}
// update rune entries
for runeId, amount := range burned {
runeEntry, ok := runeEntries[runeId]
if !ok {
continue
}
runeEntry.BurnedAmount = runeEntry.BurnedAmount.Add(amount)
if err := p.runesDg.SetRuneEntry(ctx, runeEntry); err != nil {
return errors.Wrap(err, "failed to set rune entry")
}
}
return nil
}
func (p *Processor) flushNewRuneEntryStates(ctx context.Context, blockHeight uint64) error {
for _, runeEntry := range p.newRuneEntryStates {
if err := p.runesDg.CreateRuneEntryState(ctx, runeEntry, blockHeight); err != nil {
return errors.Wrap(err, "failed to create rune entry state")
}
}
p.newRuneEntryStates = make(map[runes.RuneId]*runes.RuneEntry)
return nil
}
func (p *Processor) createIndexedBlock(ctx context.Context, header types.BlockHeader) error {
if err := p.runesDg.CreateIndexedBlock(ctx, &entity.IndexedBlock{
Height: header.Height,
Hash: header.Hash,
EventHash: chainhash.Hash{}, // TODO: calculate event hash
CumulativeEventHash: chainhash.Hash{}, // TODO: calculate cumulative event hash
}); err != nil {
return errors.Wrap(err, "failed to create indexed block")
}
return nil
}