mirror of
https://github.com/alexgo-io/gaze-indexer.git
synced 2026-04-29 12:15:13 +08:00
fix: don't remove first block
This commit is contained in:
@@ -72,7 +72,7 @@ func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
|
||||
func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
ch := make(chan []*types.Block)
|
||||
logger.InfoContext(ctx, "[BitcoinIndexer] fetching blocks", slog.Int64("from", i.currentBlock.Height+1), slog.Int64("to", -1))
|
||||
subscription, err := i.Datasource.FetchAsync(ctx, i.currentBlock.Height, -1, ch)
|
||||
subscription, err := i.Datasource.FetchAsync(ctx, i.currentBlock.Height+1, -1, ch)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to call fetch async")
|
||||
}
|
||||
@@ -86,13 +86,13 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
continue
|
||||
}
|
||||
|
||||
// validate reorg from current block
|
||||
if blocks[0].Header.Height == i.currentBlock.Height {
|
||||
// validate reorg from first block
|
||||
{
|
||||
currentBlockHeader := blocks[0].Header
|
||||
if !currentBlockHeader.Hash.IsEqual(&i.currentBlock.Hash) {
|
||||
if !currentBlockHeader.PrevBlock.IsEqual(&i.currentBlock.Hash) {
|
||||
logger.WarnContext(ctx, "reorg detected",
|
||||
slogx.Stringer("current_hash", i.currentBlock.Hash),
|
||||
slogx.Stringer("expected_hash", currentBlockHeader.Hash),
|
||||
slogx.Stringer("expected_hash", currentBlockHeader.PrevBlock),
|
||||
)
|
||||
|
||||
// TODO: find start reorg block
|
||||
@@ -105,19 +105,13 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
// remove old block
|
||||
newBlocks := blocks[1:]
|
||||
if len(newBlocks) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// validate is block is continuous and no reorg
|
||||
for i := 1; i < len(newBlocks); i++ {
|
||||
if newBlocks[i].Header.Height != newBlocks[i-1].Header.Height+1 {
|
||||
return errors.Wrapf(errs.InternalError, "block is not continuous, block[%d] height: %d, block[%d] height: %d", i-1, newBlocks[i-1].Header.Height, i, newBlocks[i].Header.Height)
|
||||
for i := 1; i < len(blocks); i++ {
|
||||
if blocks[i].Header.Height != blocks[i-1].Header.Height+1 {
|
||||
return errors.Wrapf(errs.InternalError, "block is not continuous, block[%d] height: %d, block[%d] height: %d", i-1, blocks[i-1].Header.Height, i, blocks[i].Header.Height)
|
||||
}
|
||||
|
||||
if !newBlocks[i].Header.PrevBlock.IsEqual(&newBlocks[i-1].Header.Hash) {
|
||||
if !blocks[i].Header.PrevBlock.IsEqual(&blocks[i-1].Header.Hash) {
|
||||
logger.WarnContext(ctx, "reorg occurred while batch fetching blocks, need to try to fetch again")
|
||||
// end current round
|
||||
return nil
|
||||
@@ -125,12 +119,12 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
// Start processing blocks
|
||||
if err := i.Processor.Process(ctx, newBlocks); err != nil {
|
||||
if err := i.Processor.Process(ctx, blocks); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
// Update current state
|
||||
i.currentBlock = newBlocks[len(newBlocks)-1].Header
|
||||
i.currentBlock = blocks[len(blocks)-1].Header
|
||||
case <-subscription.Done():
|
||||
// end current round
|
||||
if err := ctx.Err(); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user