feat: implement current block

This commit is contained in:
Gaze
2024-04-12 00:02:03 +07:00
parent 2668108f2b
commit f444b679be
12 changed files with 139 additions and 30 deletions

View File

@@ -2,7 +2,7 @@ BEGIN;
DROP TABLE IF EXISTS "runes_indexer_stats";
DROP TABLE IF EXISTS "runes_indexer_db_version";
DROP TABLE IF EXISTS "runes_processor_state";
DROP TABLE IF EXISTS "runes_indexed_blocks";
DROP TABLE IF EXISTS "runes_entries";
DROP TABLE IF EXISTS "runes_outpoint_balances";
DROP TABLE IF EXISTS "runes_balances";

View File

@@ -13,7 +13,8 @@ CREATE TABLE IF NOT EXISTS "runes_indexer_stats" (
CREATE TABLE IF NOT EXISTS "runes_indexer_db_version" (
"id" BIGSERIAL PRIMARY KEY,
"version" INT NOT NULL,
"created_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
"event_hash_version" INT NOT NULL,
"created_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO "runes_indexer_db_version" ("version") VALUES (1);
@@ -21,9 +22,19 @@ INSERT INTO "runes_indexer_db_version" ("version") VALUES (1);
CREATE TABLE IF NOT EXISTS "runes_processor_state" (
"latest_block_height" INT NOT NULL,
"latest_block_hash" TEXT NOT NULL,
"latest_prev_block_hash" TEXT NOT NULL,
"updated_at" TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS "runes_indexed_blocks" (
"hash" TEXT NOT NULL PRIMARY KEY,
"height" INT NOT NULL,
"event_hash" TEXT NOT NULL,
"cumulative_event_hash" TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS runes_indexed_blocks_height_idx ON "runes_indexed_blocks" USING BTREE ("height" DESC);
CREATE TABLE IF NOT EXISTS "runes_entries" (
"rune_id" TEXT NOT NULL PRIMARY KEY,
"rune" TEXT NOT NULL,

View File

@@ -29,5 +29,11 @@ INSERT INTO runes_balances (pkscript, block_height, rune_id, amount) VALUES ($1,
-- name: GetRunesProcessorState :one
SELECT * FROM runes_processor_state;
-- name: UpdateLatestBlockHeight :exec
UPDATE runes_processor_state SET latest_block_height = $1;
-- name: UpdateLatestBlock :exec
UPDATE runes_processor_state SET latest_block_height = $1, latest_block_hash = $2, latest_prev_block_hash = $3;
-- name: CreateIndexedBlock :exec
INSERT INTO runes_indexed_blocks (hash, height, event_hash, cumulative_event_hash) VALUES ($1, $2, $3, $4);
-- name: DeleteIndexedBlockByHash :exec
DELETE FROM runes_indexed_blocks WHERE hash = $1;

View File

@@ -1,8 +1,11 @@
-- name: GetCurrentDBVersion :one
SELECT "version" FROM runes_indexer_db_version ORDER BY id DESC LIMIT 1;
-- name: GetCurrenEventHashVersion :one
SELECT "event_hash_version" FROM runes_indexer_db_version ORDER BY id DESC LIMIT 1;
-- name: GetCurrentIndexerStats :one
SELECT "client_version", "network" FROM runes_indexer_stats ORDER BY id DESC LIMIT 1;
-- name: UpdateIndexerStats :exec
INSERT INTO runes_indexer_stats (client_version, network) VALUES ($1, $2);
INSERT INTO runes_indexer_stats (client_version, network) VALUES ($1, $2);

View File

@@ -54,10 +54,11 @@ func (h *HttpHandler) GetBalancesByAddress(ctx *fiber.Ctx) (err error) {
blockHeight := req.BlockHeight
if blockHeight == 0 {
blockHeight, err = h.usecase.GetLatestBlockHeight(ctx.UserContext())
blockHeader, err := h.usecase.GetLatestBlock(ctx.UserContext())
if err != nil {
return errors.Wrap(err, "error during GetLatestBlockHeight")
return errors.Wrap(err, "error during GetLatestBlock")
}
blockHeight = uint64(blockHeader.Height)
}
balances, err := h.usecase.GetBalancesByPkScript(ctx.UserContext(), pkScript, blockHeight)

View File

@@ -4,6 +4,7 @@ import (
"context"
"github.com/btcsuite/btcd/wire"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/modules/runes/internal/entity"
"github.com/gaze-network/indexer-network/modules/runes/internal/runes"
"github.com/gaze-network/uint128"
@@ -15,7 +16,7 @@ type RunesDataGateway interface {
}
type RunesReaderDataGateway interface {
GetLatestBlockHeight(ctx context.Context) (uint64, error)
GetLatestBlock(ctx context.Context) (types.BlockHeader, error)
GetRunesBalancesAtOutPoint(ctx context.Context, outPoint wire.OutPoint) (map[runes.RuneId]uint128.Uint128, error)
// GetRuneEntryByRune returns the RuneEntry for the given rune. Returns errs.NotFound if the rune entry is not found.
@@ -45,7 +46,7 @@ type RunesWriterDataGateway interface {
SetRuneEntry(ctx context.Context, entry *runes.RuneEntry) error
CreateRuneBalancesAtOutPoint(ctx context.Context, outPoint wire.OutPoint, balances map[runes.RuneId]uint128.Uint128) error
CreateRuneBalancesAtBlock(ctx context.Context, params []CreateRuneBalancesAtBlockParams) error
UpdateLatestBlockHeight(ctx context.Context, blockHeight uint64) error
UpdateLatestBlock(ctx context.Context, blockHeader types.BlockHeader) error
}
type CreateRuneBalancesAtBlockParams struct {

View File

@@ -11,6 +11,36 @@ import (
"github.com/jackc/pgx/v5/pgtype"
)
const createIndexedBlock = `-- name: CreateIndexedBlock :exec
INSERT INTO runes_indexed_blocks (hash, height, event_hash, cumulative_event_hash) VALUES ($1, $2, $3, $4)
`
type CreateIndexedBlockParams struct {
Hash string
Height int32
EventHash string
CumulativeEventHash string
}
func (q *Queries) CreateIndexedBlock(ctx context.Context, arg CreateIndexedBlockParams) error {
_, err := q.db.Exec(ctx, createIndexedBlock,
arg.Hash,
arg.Height,
arg.EventHash,
arg.CumulativeEventHash,
)
return err
}
const deleteIndexedBlockByHash = `-- name: DeleteIndexedBlockByHash :exec
DELETE FROM runes_indexed_blocks WHERE hash = $1
`
func (q *Queries) DeleteIndexedBlockByHash(ctx context.Context, hash string) error {
_, err := q.db.Exec(ctx, deleteIndexedBlockByHash, hash)
return err
}
const getBalancesByPkScript = `-- name: GetBalancesByPkScript :many
SELECT DISTINCT ON (rune_id) pkscript, block_height, rune_id, amount FROM runes_balances WHERE pkscript = $1 AND block_height <= $2 ORDER BY rune_id, block_height DESC
`
@@ -200,13 +230,18 @@ func (q *Queries) GetRuneEntriesByRunes(ctx context.Context, runes []string) ([]
}
const getRunesProcessorState = `-- name: GetRunesProcessorState :one
SELECT id, latest_block_height FROM runes_processor_state
SELECT latest_block_height, latest_block_hash, latest_prev_block_hash, updated_at FROM runes_processor_state
`
func (q *Queries) GetRunesProcessorState(ctx context.Context) (RunesProcessorState, error) {
row := q.db.QueryRow(ctx, getRunesProcessorState)
var i RunesProcessorState
err := row.Scan(&i.Id, &i.LatestBlockHeight)
err := row.Scan(
&i.LatestBlockHeight,
&i.LatestBlockHash,
&i.LatestPrevBlockHash,
&i.UpdatedAt,
)
return i, err
}
@@ -257,11 +292,17 @@ func (q *Queries) SetRuneEntry(ctx context.Context, arg SetRuneEntryParams) erro
return err
}
const updateLatestBlockHeight = `-- name: UpdateLatestBlockHeight :exec
UPDATE runes_processor_state SET latest_block_height = $1
const updateLatestBlock = `-- name: UpdateLatestBlock :exec
UPDATE runes_processor_state SET latest_block_height = $1, latest_block_hash = $2, latest_prev_block_hash = $3
`
func (q *Queries) UpdateLatestBlockHeight(ctx context.Context, latestBlockHeight int32) error {
_, err := q.db.Exec(ctx, updateLatestBlockHeight, latestBlockHeight)
type UpdateLatestBlockParams struct {
LatestBlockHeight int32
LatestBlockHash string
LatestPrevBlockHash string
}
func (q *Queries) UpdateLatestBlock(ctx context.Context, arg UpdateLatestBlockParams) error {
_, err := q.db.Exec(ctx, updateLatestBlock, arg.LatestBlockHeight, arg.LatestBlockHash, arg.LatestPrevBlockHash)
return err
}

View File

@@ -9,6 +9,17 @@ import (
"context"
)
const getCurrenEventHashVersion = `-- name: GetCurrenEventHashVersion :one
SELECT "event_hash_version" FROM runes_indexer_db_version ORDER BY id DESC LIMIT 1
`
func (q *Queries) GetCurrenEventHashVersion(ctx context.Context) (int32, error) {
row := q.db.QueryRow(ctx, getCurrenEventHashVersion)
var event_hash_version int32
err := row.Scan(&event_hash_version)
return event_hash_version, err
}
const getCurrentDBVersion = `-- name: GetCurrentDBVersion :one
SELECT "version" FROM runes_indexer_db_version ORDER BY id DESC LIMIT 1
`

View File

@@ -34,10 +34,18 @@ type RunesEntry struct {
CompletionTime pgtype.Timestamp
}
type RunesIndexedBlock struct {
Hash string
Height int32
EventHash string
CumulativeEventHash string
}
type RunesIndexerDbVersion struct {
Id int64
Version int32
CreatedAt pgtype.Timestamptz
Id int64
Version int32
EventHashVersion int32
CreatedAt pgtype.Timestamptz
}
type RunesIndexerStat struct {
@@ -58,6 +66,8 @@ type RunesOutpointBalance struct {
}
type RunesProcessorState struct {
Id int32
LatestBlockHeight int32
LatestBlockHeight int32
LatestBlockHash string
LatestPrevBlockHash string
UpdatedAt pgtype.Timestamp
}

View File

@@ -4,9 +4,11 @@ import (
"context"
"encoding/hex"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/modules/runes/internal/datagateway"
"github.com/gaze-network/indexer-network/modules/runes/internal/entity"
"github.com/gaze-network/indexer-network/modules/runes/internal/repository/postgres/gen"
@@ -18,12 +20,24 @@ import (
var _ datagateway.RunesDataGateway = (*Repository)(nil)
func (r *Repository) GetLatestBlockHeight(ctx context.Context) (uint64, error) {
func (r *Repository) GetLatestBlock(ctx context.Context) (types.BlockHeader, error) {
state, err := r.queries.GetRunesProcessorState(ctx)
if err != nil {
return 0, errors.Wrap(err, "error during query")
return types.BlockHeader{}, errors.Wrap(err, "error during query")
}
return uint64(state.LatestBlockHeight), nil
hash, err := chainhash.NewHashFromStr(state.LatestBlockHash)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "failed to parse block hash")
}
prevHash, err := chainhash.NewHashFromStr(state.LatestPrevBlockHash)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "failed to parse prev block hash")
}
return types.BlockHeader{
Height: int64(state.LatestBlockHeight),
Hash: *hash,
PrevBlock: *prevHash,
}, nil
}
func (r *Repository) GetRunesBalancesAtOutPoint(ctx context.Context, outPoint wire.OutPoint) (map[runes.RuneId]uint128.Uint128, error) {
@@ -176,8 +190,13 @@ func (r *Repository) CreateRuneBalancesAtBlock(ctx context.Context, params []dat
return nil
}
func (r *Repository) UpdateLatestBlockHeight(ctx context.Context, blockHeight uint64) error {
if err := r.queries.UpdateLatestBlockHeight(ctx, int32(blockHeight)); err != nil {
func (r *Repository) UpdateLatestBlock(ctx context.Context, blockHeader types.BlockHeader) error {
params := gen.UpdateLatestBlockParams{
LatestBlockHeight: int32(blockHeader.Height),
LatestBlockHash: blockHeader.Hash.String(),
LatestPrevBlockHash: blockHeader.PrevBlock.String(),
}
if err := r.queries.UpdateLatestBlock(ctx, params); err != nil {
return errors.Wrap(err, "error during exec")
}
return nil

View File

@@ -4,12 +4,13 @@ import (
"context"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/core/types"
)
func (u *Usecase) GetLatestBlockHeight(ctx context.Context) (uint64, error) {
height, err := u.runesDg.GetLatestBlockHeight(ctx)
func (u *Usecase) GetLatestBlock(ctx context.Context) (types.BlockHeader, error) {
blockHeader, err := u.runesDg.GetLatestBlock(ctx)
if err != nil {
return 0, errors.Wrap(err, "failed to get latest block height")
return types.BlockHeader{}, errors.Wrap(err, "failed to get latest block")
}
return height, nil
return blockHeader, nil
}

View File

@@ -3,6 +3,7 @@ package runes
import (
"context"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common"
"github.com/gaze-network/indexer-network/core/indexers"
"github.com/gaze-network/indexer-network/core/types"
@@ -37,7 +38,11 @@ func (p *Processor) Name() string {
}
func (p *Processor) CurrentBlock(ctx context.Context) (types.BlockHeader, error) {
panic("implement me")
blockHeader, err := p.runesDg.GetLatestBlock(ctx)
if err != nil {
return types.BlockHeader{}, errors.Wrap(err, "failed to get latest block")
}
return blockHeader, nil
}
func (p *Processor) PrepareData(ctx context.Context, from, to int64) ([]*types.Block, error) {