feat(btc): reorg handling

Co-authored-by: Gaze <dev@gaze.network>
This commit is contained in:
Gaze
2024-04-16 06:07:42 +07:00
parent 4c925c6071
commit 5c009aee5e
2 changed files with 62 additions and 17 deletions

View File

@@ -13,6 +13,10 @@ import (
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
)
const (
maxReorgLookBack = 100
)
type (
BitcoinProcessor Processor[[]*types.Block]
BitcoinDatasource datasources.Datasource[[]*types.Block]
@@ -88,19 +92,55 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
// validate reorg from first block
{
currentBlockHeader := blocks[0].Header
if !currentBlockHeader.PrevBlock.IsEqual(&i.currentBlock.Hash) {
remoteBlockHeader := blocks[0].Header
if !remoteBlockHeader.PrevBlock.IsEqual(&i.currentBlock.Hash) {
logger.WarnContext(ctx, "reorg detected",
slogx.Stringer("current_hash", i.currentBlock.Hash),
slogx.Stringer("expected_hash", currentBlockHeader.PrevBlock),
slogx.Stringer("expected_hash", remoteBlockHeader.PrevBlock),
)
// TODO: find start reorg block
startReorgHeight := currentBlockHeader.Height // TODO: use start reorg block height
if err := i.Processor.RevertData(ctx, startReorgHeight); err != nil {
var (
targetHeight = i.currentBlock.Height - 1
beforeReorgBlockHeader = types.BlockHeader{
Height: -1,
}
)
for n := 0; n < maxReorgLookBack; n++ {
// TODO: concurrent fetch
indexedHeader, err := i.Processor.GetIndexedBlock(ctx, targetHeight)
if err != nil {
return errors.Wrapf(err, "failed to get indexed block, height: %d", targetHeight)
}
remoteHeader, err := i.Datasource.GetBlockHeader(ctx, targetHeight)
if err != nil {
return errors.Wrapf(err, "failed to get remote block header, height: %d", targetHeight)
}
// Found no reorg block
if indexedHeader.Hash.IsEqual(&remoteHeader.Hash) {
beforeReorgBlockHeader = remoteHeader
break
}
// Walk back to find fork point
targetHeight -= 1
}
// Reorg look back limit reached
if beforeReorgBlockHeader.Height < 0 {
return errors.Wrap(errs.SomethingWentWrong, "reorg look back limit reached")
}
// Revert all data since the reorg block
logger.WarnContext(ctx, "reverting reorg data", slogx.Int64("from", beforeReorgBlockHeader.Height+1))
if err := i.Processor.RevertData(ctx, beforeReorgBlockHeader.Height+1); err != nil {
return errors.Wrap(err, "failed to revert data")
}
i.currentBlock = currentBlockHeader
// Set current block to before reorg block and
// end current round to fetch again
i.currentBlock = beforeReorgBlockHeader
return nil
}
}

View File

@@ -19,6 +19,20 @@ type BlockHeader struct {
Nonce uint32
}
func ParseMsgBlockHeader(src wire.BlockHeader, height int64) BlockHeader {
hash := src.BlockHash()
return BlockHeader{
Hash: hash,
Height: height,
Version: src.Version,
PrevBlock: src.PrevBlock,
MerkleRoot: src.MerkleRoot,
Timestamp: src.Timestamp,
Bits: src.Bits,
Nonce: src.Nonce,
}
}
type Block struct {
Header BlockHeader
Transactions []*Transaction
@@ -27,16 +41,7 @@ type Block struct {
func ParseMsgBlock(src *wire.MsgBlock, height int64) *Block {
hash := src.Header.BlockHash()
return &Block{
Header: BlockHeader{
Hash: hash,
Height: height,
Version: src.Header.Version,
PrevBlock: src.Header.PrevBlock,
MerkleRoot: src.Header.MerkleRoot,
Timestamp: src.Header.Timestamp,
Bits: src.Header.Bits,
Nonce: src.Header.Nonce,
},
Header: ParseMsgBlockHeader(src.Header, height),
Transactions: lo.Map(src.Transactions, func(item *wire.MsgTx, index int) *Transaction { return ParseMsgTx(item, height, hash, uint32(index)) }),
}
}