Merge branch 'feature/bitcoin-indexer' into feat/runes-module

This commit is contained in:
Gaze
2024-04-15 14:20:32 +07:00
3 changed files with 8 additions and 2 deletions

View File

@@ -165,6 +165,7 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
logger.ErrorContext(ctx, "failed to send error", slogx.Error(err))
}
}
logger.DebugContext(ctx, "[BitcoinNodeDatasource] Fetched block", slogx.Int64("height", height), slogx.String("hash", hash.String()))
blocks = append(blocks, types.ParseMsgBlock(block, height))
}

View File

@@ -33,13 +33,17 @@ type ClientSubscription[T any] struct {
}
func newClientSubscription[T any](channel chan<- T) *ClientSubscription[T] {
return &ClientSubscription[T]{
subscription := &ClientSubscription[T]{
channel: channel,
in: make(chan T, ClientSubscriptionBufferSize),
err: make(chan error, ClientSubscriptionBufferSize),
quit: make(chan struct{}),
quitDone: make(chan struct{}),
}
go func() {
subscription.run()
}()
return subscription
}
func (c *ClientSubscription[T]) Unsubscribe() {

View File

@@ -59,7 +59,7 @@ func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
case <-ctx.Done():
return nil
case <-ticker.C:
ctx = logger.WithContext(ctx, slog.Int64("current_block_height", i.currentBlock.Height))
ctx := logger.WithContext(ctx, slog.Int64("current_block_height", i.currentBlock.Height))
if err := i.process(ctx); err != nil {
logger.ErrorContext(ctx, "failed to process", slogx.Error(err))
@@ -71,6 +71,7 @@ func (i *BitcoinIndexer) Run(ctx context.Context) (err error) {
func (i *BitcoinIndexer) process(ctx context.Context) (err error) {
ch := make(chan []*types.Block)
logger.InfoContext(ctx, "[BitcoinIndexer] fetching blocks", slog.Int64("from", i.currentBlock.Height+1), slog.Int64("to", -1))
subscription, err := i.Datasource.FetchAsync(ctx, i.currentBlock.Height, -1, ch)
if err != nil {
return errors.Wrap(err, "failed to call fetch async")