|
|
|
|
@@ -1,4 +1,4 @@
|
|
|
|
|
package indexers
|
|
|
|
|
package indexer
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
@@ -16,20 +16,15 @@ import (
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
maxReorgLookBack = 1000
|
|
|
|
|
|
|
|
|
|
// pollingInterval is the default polling interval for the indexer polling worker
|
|
|
|
|
pollingInterval = 15 * time.Second
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type (
|
|
|
|
|
BitcoinProcessor Processor[[]*types.Block]
|
|
|
|
|
BitcoinDatasource datasources.Datasource[[]*types.Block]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Make sure to implement the IndexerWorker interface
|
|
|
|
|
var _ IndexerWorker = (*BitcoinIndexer)(nil)
|
|
|
|
|
|
|
|
|
|
// BitcoinIndexer is the polling indexer for sync Bitcoin data to the database.
|
|
|
|
|
type BitcoinIndexer struct {
|
|
|
|
|
Processor BitcoinProcessor
|
|
|
|
|
Datasource BitcoinDatasource
|
|
|
|
|
// Indexer generic indexer for fetching and processing data
|
|
|
|
|
type Indexer[T Input] struct {
|
|
|
|
|
Processor Processor[T]
|
|
|
|
|
Datasource datasources.Datasource[T]
|
|
|
|
|
currentBlock types.BlockHeader
|
|
|
|
|
|
|
|
|
|
quitOnce sync.Once
|
|
|
|
|
@@ -37,9 +32,9 @@ type BitcoinIndexer struct {
|
|
|
|
|
done chan struct{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewBitcoinIndexer create new BitcoinIndexer
|
|
|
|
|
func NewBitcoinIndexer(processor BitcoinProcessor, datasource BitcoinDatasource) *BitcoinIndexer {
|
|
|
|
|
return &BitcoinIndexer{
|
|
|
|
|
// New create new generic indexer
|
|
|
|
|
func New[T Input](processor Processor[T], datasource datasources.Datasource[T]) *Indexer[T] {
|
|
|
|
|
return &Indexer[T]{
|
|
|
|
|
Processor: processor,
|
|
|
|
|
Datasource: datasource,
|
|
|
|
|
|
|
|
|
|
@@ -48,21 +43,17 @@ func NewBitcoinIndexer(processor BitcoinProcessor, datasource BitcoinDatasource)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (*BitcoinIndexer) Type() string {
|
|
|
|
|
return "bitcoin"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (i *BitcoinIndexer) Shutdown() error {
|
|
|
|
|
func (i *Indexer[T]) Shutdown() error {
|
|
|
|
|
return i.ShutdownWithContext(context.Background())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (i *BitcoinIndexer) ShutdownWithTimeout(timeout time.Duration) error {
|
|
|
|
|
func (i *Indexer[T]) ShutdownWithTimeout(timeout time.Duration) error {
|
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
|
|
|
defer cancel()
|
|
|
|
|
return i.ShutdownWithContext(ctx)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (i *BitcoinIndexer) ShutdownWithContext(ctx context.Context) (err error) {
|
|
|
|
|
func (i *Indexer[T]) ShutdownWithContext(ctx context.Context) (err error) {
|
|
|
|
|
i.quitOnce.Do(func() {
|
|
|
|
|
close(i.quit)
|
|
|
|
|
select {
|
|
|
|
|
@@ -76,12 +67,11 @@ func (i *BitcoinIndexer) ShutdownWithContext(ctx context.Context) (err error) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
|
|
|
|
|
func (i *Indexer[T]) Run(ctx context.Context) (err error) {
|
|
|
|
|
defer close(i.done)
|
|
|
|
|
|
|
|
|
|
ctx = logger.WithContext(ctx,
|
|
|
|
|
slog.String("package", "indexers"),
|
|
|
|
|
slog.String("indexer", i.Type()),
|
|
|
|
|
slog.String("processor", i.Processor.Name()),
|
|
|
|
|
slog.String("datasource", i.Datasource.Name()),
|
|
|
|
|
)
|
|
|
|
|
@@ -114,15 +104,15 @@ func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
|
|
|
|
func (i *Indexer[T]) process(ctx context.Context) (err error) {
|
|
|
|
|
// height range to fetch data
|
|
|
|
|
from, to := i.currentBlock.Height+1, int64(-1)
|
|
|
|
|
|
|
|
|
|
logger.InfoContext(ctx, "Start fetching bitcoin blocks", slog.Int64("from", from))
|
|
|
|
|
ch := make(chan []*types.Block)
|
|
|
|
|
logger.InfoContext(ctx, "Start fetching input data", slog.Int64("from", from))
|
|
|
|
|
ch := make(chan []T)
|
|
|
|
|
subscription, err := i.Datasource.FetchAsync(ctx, from, to, ch)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return errors.Wrap(err, "failed to fetch data")
|
|
|
|
|
return errors.Wrap(err, "failed to fetch input data")
|
|
|
|
|
}
|
|
|
|
|
defer subscription.Unsubscribe()
|
|
|
|
|
|
|
|
|
|
@@ -130,21 +120,24 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
|
|
|
|
select {
|
|
|
|
|
case <-i.quit:
|
|
|
|
|
return nil
|
|
|
|
|
case blocks := <-ch:
|
|
|
|
|
// empty blocks
|
|
|
|
|
if len(blocks) == 0 {
|
|
|
|
|
case inputs := <-ch:
|
|
|
|
|
// empty inputs
|
|
|
|
|
if len(inputs) == 0 {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
firstInput := inputs[0]
|
|
|
|
|
firstInputHeader := firstInput.BlockHeader()
|
|
|
|
|
|
|
|
|
|
startAt := time.Now()
|
|
|
|
|
ctx := logger.WithContext(ctx,
|
|
|
|
|
slogx.Int64("from", blocks[0].Header.Height),
|
|
|
|
|
slogx.Int64("to", blocks[len(blocks)-1].Header.Height),
|
|
|
|
|
slogx.Int64("from", firstInputHeader.Height),
|
|
|
|
|
slogx.Int64("to", inputs[len(inputs)-1].BlockHeader().Height),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// validate reorg from first block
|
|
|
|
|
// validate reorg from first input
|
|
|
|
|
{
|
|
|
|
|
remoteBlockHeader := blocks[0].Header
|
|
|
|
|
remoteBlockHeader := firstInputHeader
|
|
|
|
|
if !remoteBlockHeader.PrevBlock.IsEqual(&i.currentBlock.Hash) {
|
|
|
|
|
logger.WarnContext(ctx, "Detected chain reorganization. Searching for fork point...",
|
|
|
|
|
slogx.String("event", "reorg_detected"),
|
|
|
|
|
@@ -210,33 +203,35 @@ func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// validate is block is continuous and no reorg
|
|
|
|
|
for i := 1; i < len(blocks); i++ {
|
|
|
|
|
if blocks[i].Header.Height != blocks[i-1].Header.Height+1 {
|
|
|
|
|
return errors.Wrapf(errs.InternalError, "block is not continuous, block[%d] height: %d, block[%d] height: %d", i-1, blocks[i-1].Header.Height, i, blocks[i].Header.Height)
|
|
|
|
|
// validate is input is continuous and no reorg
|
|
|
|
|
for i := 1; i < len(inputs); i++ {
|
|
|
|
|
header := inputs[i].BlockHeader()
|
|
|
|
|
prevHeader := inputs[i-1].BlockHeader()
|
|
|
|
|
if header.Height != prevHeader.Height+1 {
|
|
|
|
|
return errors.Wrapf(errs.InternalError, "input is not continuous, input[%d] height: %d, input[%d] height: %d", i-1, prevHeader.Height, i, header.Height)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !blocks[i].Header.PrevBlock.IsEqual(&blocks[i-1].Header.Hash) {
|
|
|
|
|
logger.WarnContext(ctx, "Chain Reorganization occurred in the middle of batch fetching blocks, need to try to fetch again")
|
|
|
|
|
if !header.PrevBlock.IsEqual(&prevHeader.Hash) {
|
|
|
|
|
logger.WarnContext(ctx, "Chain Reorganization occurred in the middle of batch fetching inputs, need to try to fetch again")
|
|
|
|
|
|
|
|
|
|
// end current round
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctx = logger.WithContext(ctx, slog.Int("total_blocks", len(blocks)))
|
|
|
|
|
ctx = logger.WithContext(ctx, slog.Int("total_inputs", len(inputs)))
|
|
|
|
|
|
|
|
|
|
// Start processing blocks
|
|
|
|
|
logger.InfoContext(ctx, "Processing blocks")
|
|
|
|
|
if err := i.Processor.Process(ctx, blocks); err != nil {
|
|
|
|
|
// Start processing input
|
|
|
|
|
logger.InfoContext(ctx, "Processing inputs")
|
|
|
|
|
if err := i.Processor.Process(ctx, inputs); err != nil {
|
|
|
|
|
return errors.WithStack(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update current state
|
|
|
|
|
i.currentBlock = blocks[len(blocks)-1].Header
|
|
|
|
|
i.currentBlock = inputs[len(inputs)-1].BlockHeader()
|
|
|
|
|
|
|
|
|
|
logger.InfoContext(ctx, "Processed blocks successfully",
|
|
|
|
|
slogx.String("event", "processed_blocks"),
|
|
|
|
|
logger.InfoContext(ctx, "Processed inputs successfully",
|
|
|
|
|
slogx.String("event", "processed_inputs"),
|
|
|
|
|
slogx.Int64("current_block", i.currentBlock.Height),
|
|
|
|
|
slogx.Duration("duration", time.Since(startAt)),
|
|
|
|
|
)
|