feat: optimize runes processor by batching GetInputBalance

This commit is contained in:
Gaze
2024-04-20 19:15:37 +07:00
parent 60ac0c3580
commit 51de5e945d
5 changed files with 164 additions and 29 deletions

View File

@@ -16,6 +16,9 @@ SELECT * FROM runes_balances WHERE pkscript = $1 AND rune_id = $2 AND block_heig
-- name: GetOutPointBalancesAtOutPoint :many
SELECT * FROM runes_outpoint_balances WHERE tx_hash = $1 AND tx_idx = $2;
-- name: GetOutPointBalancesAtOutPointBatch :batchmany
SELECT * FROM runes_outpoint_balances WHERE tx_hash = $1 AND tx_idx = $2;
-- name: GetUnspentOutPointBalancesByPkScript :many
SELECT * FROM runes_outpoint_balances WHERE pkscript = @pkScript AND block_height <= @block_height AND (spent_height IS NULL OR spent_height > @block_height);

View File

@@ -29,6 +29,7 @@ type RunesReaderDataGateway interface {
GetRuneTransactionsByHeight(ctx context.Context, height uint64) ([]*entity.RuneTransaction, error)
GetRunesBalancesAtOutPoint(ctx context.Context, outPoint wire.OutPoint) (map[runes.RuneId]*entity.OutPointBalance, error)
GetRunesBalancesAtOutPointBatch(ctx context.Context, outPoints []wire.OutPoint) (map[wire.OutPoint]map[runes.RuneId]*entity.OutPointBalance, error)
GetUnspentOutPointBalancesByPkScript(ctx context.Context, pkScript []byte, blockHeight uint64) ([]*entity.OutPointBalance, error)
// GetRuneIdFromRune returns the RuneId for the given rune. Returns errs.NotFound if the rune entry is not found.
GetRuneIdFromRune(ctx context.Context, rune runes.Rune) (runes.RuneId, error)

View File

@@ -23,12 +23,22 @@ import (
)
func (p *Processor) Process(ctx context.Context, blocks []*types.Block) error {
// collect txs to batch get input balances
txs := make([]*types.Transaction, 0)
for _, block := range blocks {
txs = append(txs, block.Transactions...)
}
inputBalances, err := p.getInputBalances(ctx, txs)
if err != nil {
return errors.Wrap(err, "failed to get input balances")
}
for _, block := range blocks {
ctx := logger.WithContext(ctx, slog.Int("block_height", int(block.Header.Height)))
logger.DebugContext(ctx, "[RunesProcessor] Processing block", slog.Int("txs", len(block.Transactions)))
for _, tx := range block.Transactions {
if err := p.processTx(ctx, tx, block.Header); err != nil {
if err := p.processTx(ctx, tx, block.Header, inputBalances[tx.TxHash]); err != nil {
return errors.Wrap(err, "failed to process tx")
}
}
@@ -39,7 +49,7 @@ func (p *Processor) Process(ctx context.Context, blocks []*types.Block) error {
return nil
}
func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockHeader types.BlockHeader) error {
func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockHeader types.BlockHeader, inputBalances map[int]map[runes.RuneId]*entity.OutPointBalance) error {
if tx.BlockHeight < int64(runes.FirstRuneHeight(p.network)) {
// prevent processing txs before the activation height
return nil
@@ -49,11 +59,6 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
return errors.Wrap(err, "failed to decipher runestone")
}
inputBalances, err := p.getInputBalances(ctx, tx.TxIn)
if err != nil {
return errors.Wrap(err, "failed to get input balances")
}
if runestone == nil && len(inputBalances) == 0 {
// no runes involved in this tx
return nil
@@ -319,19 +324,32 @@ func (p *Processor) processTx(ctx context.Context, tx *types.Transaction, blockH
return nil
}
func (p *Processor) getInputBalances(ctx context.Context, txInputs []*types.TxIn) (map[int]map[runes.RuneId]*entity.OutPointBalance, error) {
inputBalances := make(map[int]map[runes.RuneId]*entity.OutPointBalance)
for i, txIn := range txInputs {
balances, err := p.getRunesBalancesAtOutPoint(ctx, wire.OutPoint{
func (p *Processor) getInputBalances(ctx context.Context, txs []*types.Transaction) (map[chainhash.Hash]map[int]map[runes.RuneId]*entity.OutPointBalance, error) {
inputBalances := make(map[chainhash.Hash]map[int]map[runes.RuneId]*entity.OutPointBalance)
outPoints := make([]wire.OutPoint, 0)
for _, tx := range txs {
for _, txIn := range tx.TxIn {
outPoints = append(outPoints, wire.OutPoint{
Hash: txIn.PreviousOutTxHash,
Index: txIn.PreviousOutIndex,
})
if err != nil {
return nil, errors.Wrap(err, "failed to get runes balances at outpoint")
}
if len(balances) > 0 {
inputBalances[i] = balances
}
outPointBalances, err := p.getRunesBalancesAtOutPointBatch(ctx, outPoints)
if err != nil {
return nil, errors.Wrap(err, "failed to get runes balances at outpoints")
}
for _, tx := range txs {
inputBalances[tx.TxHash] = make(map[int]map[runes.RuneId]*entity.OutPointBalance)
for inputIndex, txIn := range tx.TxIn {
outPoint := wire.OutPoint{
Hash: txIn.PreviousOutTxHash,
Index: txIn.PreviousOutIndex,
}
balancesInOutPoint := outPointBalances[outPoint]
if len(balancesInOutPoint) > 0 {
inputBalances[tx.TxHash][inputIndex] = balancesInOutPoint
}
}
}
return inputBalances, nil
@@ -674,18 +692,30 @@ func (p *Processor) isRuneExists(ctx context.Context, rune runes.Rune) (bool, er
return true, nil
}
func (p *Processor) getRunesBalancesAtOutPoint(ctx context.Context, outPoint wire.OutPoint) (map[runes.RuneId]*entity.OutPointBalance, error) {
func (p *Processor) getRunesBalancesAtOutPointBatch(ctx context.Context, outPoints []wire.OutPoint) (map[wire.OutPoint]map[runes.RuneId]*entity.OutPointBalance, error) {
balances := make(map[wire.OutPoint]map[runes.RuneId]*entity.OutPointBalance)
outPointsToFetch := make([]wire.OutPoint, 0)
for _, outPoint := range outPoints {
balances[outPoint] = make(map[runes.RuneId]*entity.OutPointBalance)
if outPointBalances, ok := p.newOutPointBalances[outPoint]; ok {
balances := make(map[runes.RuneId]*entity.OutPointBalance)
for _, outPointBalance := range outPointBalances {
balances[outPointBalance.RuneId] = outPointBalance
balances[outPoint][outPointBalance.RuneId] = outPointBalance
}
return balances, nil
} else {
outPointsToFetch = append(outPointsToFetch, outPoint)
}
balances, err := p.runesDg.GetRunesBalancesAtOutPoint(ctx, outPoint)
}
if len(outPointsToFetch) > 0 {
fetchedBalances, err := p.runesDg.GetRunesBalancesAtOutPointBatch(ctx, outPointsToFetch)
if err != nil {
return nil, errors.Wrap(err, "failed to get runes balances at outpoint")
return nil, errors.Wrap(err, "failed to get runes balances at outpoints batch")
}
for outPoint, fetchedBalancesByRuneId := range fetchedBalances {
balances[outPoint] = make(map[runes.RuneId]*entity.OutPointBalance)
for runeId, balance := range fetchedBalancesByRuneId {
balances[outPoint][runeId] = balance
}
}
}
return balances, nil
}

View File

@@ -128,3 +128,75 @@ func (b *CreateRuneBalanceAtBlockBatchResults) Close() error {
b.closed = true
return b.br.Close()
}
const getOutPointBalancesAtOutPointBatch = `-- name: GetOutPointBalancesAtOutPointBatch :batchmany
SELECT rune_id, pkscript, tx_hash, tx_idx, amount, block_height, spent_height FROM runes_outpoint_balances WHERE tx_hash = $1 AND tx_idx = $2
`
type GetOutPointBalancesAtOutPointBatchBatchResults struct {
br pgx.BatchResults
tot int
closed bool
}
type GetOutPointBalancesAtOutPointBatchParams struct {
TxHash string
TxIdx int32
}
func (q *Queries) GetOutPointBalancesAtOutPointBatch(ctx context.Context, arg []GetOutPointBalancesAtOutPointBatchParams) *GetOutPointBalancesAtOutPointBatchBatchResults {
batch := &pgx.Batch{}
for _, a := range arg {
vals := []interface{}{
a.TxHash,
a.TxIdx,
}
batch.Queue(getOutPointBalancesAtOutPointBatch, vals...)
}
br := q.db.SendBatch(ctx, batch)
return &GetOutPointBalancesAtOutPointBatchBatchResults{br, len(arg), false}
}
func (b *GetOutPointBalancesAtOutPointBatchBatchResults) Query(f func(int, []RunesOutpointBalance, error)) {
defer b.br.Close()
for t := 0; t < b.tot; t++ {
var items []RunesOutpointBalance
if b.closed {
if f != nil {
f(t, items, ErrBatchAlreadyClosed)
}
continue
}
err := func() error {
rows, err := b.br.Query()
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
var i RunesOutpointBalance
if err := rows.Scan(
&i.RuneID,
&i.Pkscript,
&i.TxHash,
&i.TxIdx,
&i.Amount,
&i.BlockHeight,
&i.SpentHeight,
); err != nil {
return err
}
items = append(items, i)
}
return rows.Err()
}()
if f != nil {
f(t, items, err)
}
}
}
func (b *GetOutPointBalancesAtOutPointBatchBatchResults) Close() error {
b.closed = true
return b.br.Close()
}

View File

@@ -110,6 +110,35 @@ func (r *Repository) GetRunesBalancesAtOutPoint(ctx context.Context, outPoint wi
return result, nil
}
func (r *Repository) GetRunesBalancesAtOutPointBatch(ctx context.Context, outPoints []wire.OutPoint) (map[wire.OutPoint]map[runes.RuneId]*entity.OutPointBalance, error) {
params := lo.Map(outPoints, func(outPoint wire.OutPoint, _ int) gen.GetOutPointBalancesAtOutPointBatchParams {
return gen.GetOutPointBalancesAtOutPointBatchParams{
TxHash: outPoint.Hash.String(),
TxIdx: int32(outPoint.Index),
}
})
queryResults := r.queries.GetOutPointBalancesAtOutPointBatch(ctx, params)
var errorList []error
result := make(map[wire.OutPoint]map[runes.RuneId]*entity.OutPointBalance)
queryResults.Query(func(i int, balances []gen.RunesOutpointBalance, err error) {
if err != nil {
errorList = append(errorList, errors.Wrap(err, "error during query"))
return
}
result[outPoints[i]] = make(map[runes.RuneId]*entity.OutPointBalance, len(balances))
for _, balanceModel := range balances {
balance, err := mapOutPointBalanceModelToType(balanceModel)
if err != nil {
errorList = append(errorList, errors.Wrap(err, "failed to parse balance model"))
break
}
result[outPoints[i]][balance.RuneId] = &balance
}
})
return result, nil
}
func (r *Repository) GetUnspentOutPointBalancesByPkScript(ctx context.Context, pkScript []byte, blockHeight uint64) ([]*entity.OutPointBalance, error) {
balances, err := r.queries.GetUnspentOutPointBalancesByPkScript(ctx, gen.GetUnspentOutPointBalancesByPkScriptParams{
Pkscript: hex.EncodeToString(pkScript),