From 51de5e945de6f02b0d3c153a0445c9ca88f736d1 Mon Sep 17 00:00:00 2001 From: Gaze Date: Sat, 20 Apr 2024 19:15:37 +0700 Subject: [PATCH] feat: optimize runes processor by batching GetInputBalance --- .../database/postgresql/queries/data.sql | 3 + modules/runes/datagateway/runes.go | 1 + modules/runes/processor_process.go | 88 +++++++++++++------ .../runes/repository/postgres/gen/batch.go | 72 +++++++++++++++ modules/runes/repository/postgres/runes.go | 29 ++++++ 5 files changed, 164 insertions(+), 29 deletions(-) diff --git a/modules/runes/database/postgresql/queries/data.sql b/modules/runes/database/postgresql/queries/data.sql index 2abf64e..c89db8a 100644 --- a/modules/runes/database/postgresql/queries/data.sql +++ b/modules/runes/database/postgresql/queries/data.sql @@ -16,6 +16,9 @@ SELECT * FROM runes_balances WHERE pkscript = $1 AND rune_id = $2 AND block_heig -- name: GetOutPointBalancesAtOutPoint :many SELECT * FROM runes_outpoint_balances WHERE tx_hash = $1 AND tx_idx = $2; +-- name: GetOutPointBalancesAtOutPointBatch :batchmany +SELECT * FROM runes_outpoint_balances WHERE tx_hash = $1 AND tx_idx = $2; + -- name: GetUnspentOutPointBalancesByPkScript :many SELECT * FROM runes_outpoint_balances WHERE pkscript = @pkScript AND block_height <= @block_height AND (spent_height IS NULL OR spent_height > @block_height); diff --git a/modules/runes/datagateway/runes.go b/modules/runes/datagateway/runes.go index 76f2e55..fe784c8 100644 --- a/modules/runes/datagateway/runes.go +++ b/modules/runes/datagateway/runes.go @@ -29,6 +29,7 @@ type RunesReaderDataGateway interface { GetRuneTransactionsByHeight(ctx context.Context, height uint64) ([]*entity.RuneTransaction, error) GetRunesBalancesAtOutPoint(ctx context.Context, outPoint wire.OutPoint) (map[runes.RuneId]*entity.OutPointBalance, error) + GetRunesBalancesAtOutPointBatch(ctx context.Context, outPoints []wire.OutPoint) (map[wire.OutPoint]map[runes.RuneId]*entity.OutPointBalance, error) GetUnspentOutPointBalancesByPkScript(ctx context.Context, pkScript []byte, blockHeight uint64) ([]*entity.OutPointBalance, error) // GetRuneIdFromRune returns the RuneId for the given rune. Returns errs.NotFound if the rune entry is not found. GetRuneIdFromRune(ctx context.Context, rune runes.Rune) (runes.RuneId, error) diff --git a/modules/runes/processor_process.go b/modules/runes/processor_process.go index e265160..2f86fe7 100644 --- a/modules/runes/processor_process.go +++ b/modules/runes/processor_process.go @@ -23,12 +23,22 @@ import ( ) func (p *Processor) Process(ctx context.Context, blocks []*types.Block) error { + // collect txs to batch get input balances + txs := make([]*types.Transaction, 0) + for _, block := range blocks { + txs = append(txs, block.Transactions...) + } + inputBalances, err := p.getInputBalances(ctx, txs) + if err != nil { + return errors.Wrap(err, "failed to get input balances") + } + for _, block := range blocks { ctx := logger.WithContext(ctx, slog.Int("block_height", int(block.Header.Height))) logger.DebugContext(ctx, "[RunesProcessor] Processing block", slog.Int("txs", len(block.Transactions))) for _, tx := range block.Transactions { - if err := p.processTx(ctx, tx, block.Header); err != nil { + if err := p.processTx(ctx, tx, block.Header, inputBalances[tx.TxHash]); err != nil { return errors.Wrap(err, "failed to process tx") } } @@ -39,7 +49,7 @@ func (p *Processor) Process(ctx context.Context, blocks []*types.Block) error { return nil } -func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockHeader types.BlockHeader) error { +func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockHeader types.BlockHeader, inputBalances map[int]map[runes.RuneId]*entity.OutPointBalance) error { if tx.BlockHeight < int64(runes.FirstRuneHeight(p.network)) { // prevent processing txs before the activation height return nil @@ -49,11 +59,6 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH return errors.Wrap(err, "failed to decipher runestone") } - inputBalances, err := p.getInputBalances(ctx, tx.TxIn) - if err != nil { - return errors.Wrap(err, "failed to get input balances") - } - if runestone == nil && len(inputBalances) == 0 { // no runes involved in this tx return nil @@ -319,19 +324,32 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH return nil } -func (p *Processor) getInputBalances(ctx context.Context, txInputs []*types.TxIn) (map[int]map[runes.RuneId]*entity.OutPointBalance, error) { - inputBalances := make(map[int]map[runes.RuneId]*entity.OutPointBalance) - for i, txIn := range txInputs { - balances, err := p.getRunesBalancesAtOutPoint(ctx, wire.OutPoint{ - Hash: txIn.PreviousOutTxHash, - Index: txIn.PreviousOutIndex, - }) - if err != nil { - return nil, errors.Wrap(err, "failed to get runes balances at outpoint") +func (p *Processor) getInputBalances(ctx context.Context, txs []*types.Transaction) (map[chainhash.Hash]map[int]map[runes.RuneId]*entity.OutPointBalance, error) { + inputBalances := make(map[chainhash.Hash]map[int]map[runes.RuneId]*entity.OutPointBalance) + outPoints := make([]wire.OutPoint, 0) + for _, tx := range txs { + for _, txIn := range tx.TxIn { + outPoints = append(outPoints, wire.OutPoint{ + Hash: txIn.PreviousOutTxHash, + Index: txIn.PreviousOutIndex, + }) } - - if len(balances) > 0 { - inputBalances[i] = balances + } + outPointBalances, err := p.getRunesBalancesAtOutPointBatch(ctx, outPoints) + if err != nil { + return nil, errors.Wrap(err, "failed to get runes balances at outpoints") + } + for _, tx := range txs { + inputBalances[tx.TxHash] = make(map[int]map[runes.RuneId]*entity.OutPointBalance) + for inputIndex, txIn := range tx.TxIn { + outPoint := wire.OutPoint{ + Hash: txIn.PreviousOutTxHash, + Index: txIn.PreviousOutIndex, + } + balancesInOutPoint := outPointBalances[outPoint] + if len(balancesInOutPoint) > 0 { + inputBalances[tx.TxHash][inputIndex] = balancesInOutPoint + } } } return inputBalances, nil @@ -674,18 +692,30 @@ func (p *Processor) isRuneExists(ctx context.Context, rune runes.Rune) (bool, er return true, nil } -func (p *Processor) getRunesBalancesAtOutPoint(ctx context.Context, outPoint wire.OutPoint) (map[runes.RuneId]*entity.OutPointBalance, error) { - if outPointBalances, ok := p.newOutPointBalances[outPoint]; ok { - balances := make(map[runes.RuneId]*entity.OutPointBalance) - for _, outPointBalance := range outPointBalances { - balances[outPointBalance.RuneId] = outPointBalance +func (p *Processor) getRunesBalancesAtOutPointBatch(ctx context.Context, outPoints []wire.OutPoint) (map[wire.OutPoint]map[runes.RuneId]*entity.OutPointBalance, error) { + balances := make(map[wire.OutPoint]map[runes.RuneId]*entity.OutPointBalance) + outPointsToFetch := make([]wire.OutPoint, 0) + for _, outPoint := range outPoints { + balances[outPoint] = make(map[runes.RuneId]*entity.OutPointBalance) + if outPointBalances, ok := p.newOutPointBalances[outPoint]; ok { + for _, outPointBalance := range outPointBalances { + balances[outPoint][outPointBalance.RuneId] = outPointBalance + } + } else { + outPointsToFetch = append(outPointsToFetch, outPoint) } - return balances, nil } - - balances, err := p.runesDg.GetRunesBalancesAtOutPoint(ctx, outPoint) - if err != nil { - return nil, errors.Wrap(err, "failed to get runes balances at outpoint") + if len(outPointsToFetch) > 0 { + fetchedBalances, err := p.runesDg.GetRunesBalancesAtOutPointBatch(ctx, outPointsToFetch) + if err != nil { + return nil, errors.Wrap(err, "failed to get runes balances at outpoints batch") + } + for outPoint, fetchedBalancesByRuneId := range fetchedBalances { + balances[outPoint] = make(map[runes.RuneId]*entity.OutPointBalance) + for runeId, balance := range fetchedBalancesByRuneId { + balances[outPoint][runeId] = balance + } + } } return balances, nil } diff --git a/modules/runes/repository/postgres/gen/batch.go b/modules/runes/repository/postgres/gen/batch.go index 177306b..e2bb274 100644 --- a/modules/runes/repository/postgres/gen/batch.go +++ b/modules/runes/repository/postgres/gen/batch.go @@ -128,3 +128,75 @@ func (b *CreateRuneBalanceAtBlockBatchResults) Close() error { b.closed = true return b.br.Close() } + +const getOutPointBalancesAtOutPointBatch = `-- name: GetOutPointBalancesAtOutPointBatch :batchmany +SELECT rune_id, pkscript, tx_hash, tx_idx, amount, block_height, spent_height FROM runes_outpoint_balances WHERE tx_hash = $1 AND tx_idx = $2 +` + +type GetOutPointBalancesAtOutPointBatchBatchResults struct { + br pgx.BatchResults + tot int + closed bool +} + +type GetOutPointBalancesAtOutPointBatchParams struct { + TxHash string + TxIdx int32 +} + +func (q *Queries) GetOutPointBalancesAtOutPointBatch(ctx context.Context, arg []GetOutPointBalancesAtOutPointBatchParams) *GetOutPointBalancesAtOutPointBatchBatchResults { + batch := &pgx.Batch{} + for _, a := range arg { + vals := []interface{}{ + a.TxHash, + a.TxIdx, + } + batch.Queue(getOutPointBalancesAtOutPointBatch, vals...) + } + br := q.db.SendBatch(ctx, batch) + return &GetOutPointBalancesAtOutPointBatchBatchResults{br, len(arg), false} +} + +func (b *GetOutPointBalancesAtOutPointBatchBatchResults) Query(f func(int, []RunesOutpointBalance, error)) { + defer b.br.Close() + for t := 0; t < b.tot; t++ { + var items []RunesOutpointBalance + if b.closed { + if f != nil { + f(t, items, ErrBatchAlreadyClosed) + } + continue + } + err := func() error { + rows, err := b.br.Query() + if err != nil { + return err + } + defer rows.Close() + for rows.Next() { + var i RunesOutpointBalance + if err := rows.Scan( + &i.RuneID, + &i.Pkscript, + &i.TxHash, + &i.TxIdx, + &i.Amount, + &i.BlockHeight, + &i.SpentHeight, + ); err != nil { + return err + } + items = append(items, i) + } + return rows.Err() + }() + if f != nil { + f(t, items, err) + } + } +} + +func (b *GetOutPointBalancesAtOutPointBatchBatchResults) Close() error { + b.closed = true + return b.br.Close() +} diff --git a/modules/runes/repository/postgres/runes.go b/modules/runes/repository/postgres/runes.go index e92e0f9..119be9c 100644 --- a/modules/runes/repository/postgres/runes.go +++ b/modules/runes/repository/postgres/runes.go @@ -110,6 +110,35 @@ func (r *Repository) GetRunesBalancesAtOutPoint(ctx context.Context, outPoint wi return result, nil } +func (r *Repository) GetRunesBalancesAtOutPointBatch(ctx context.Context, outPoints []wire.OutPoint) (map[wire.OutPoint]map[runes.RuneId]*entity.OutPointBalance, error) { + params := lo.Map(outPoints, func(outPoint wire.OutPoint, _ int) gen.GetOutPointBalancesAtOutPointBatchParams { + return gen.GetOutPointBalancesAtOutPointBatchParams{ + TxHash: outPoint.Hash.String(), + TxIdx: int32(outPoint.Index), + } + }) + queryResults := r.queries.GetOutPointBalancesAtOutPointBatch(ctx, params) + + var errorList []error + result := make(map[wire.OutPoint]map[runes.RuneId]*entity.OutPointBalance) + queryResults.Query(func(i int, balances []gen.RunesOutpointBalance, err error) { + if err != nil { + errorList = append(errorList, errors.Wrap(err, "error during query")) + return + } + result[outPoints[i]] = make(map[runes.RuneId]*entity.OutPointBalance, len(balances)) + for _, balanceModel := range balances { + balance, err := mapOutPointBalanceModelToType(balanceModel) + if err != nil { + errorList = append(errorList, errors.Wrap(err, "failed to parse balance model")) + break + } + result[outPoints[i]][balance.RuneId] = &balance + } + }) + return result, nil +} + func (r *Repository) GetUnspentOutPointBalancesByPkScript(ctx context.Context, pkScript []byte, blockHeight uint64) ([]*entity.OutPointBalance, error) { balances, err := r.queries.GetUnspentOutPointBalancesByPkScript(ctx, gen.GetUnspentOutPointBalancesByPkScriptParams{ Pkscript: hex.EncodeToString(pkScript),