fix: annoying error when unsubscribe fetcher

Co-authored-by: Gaze <dev@gaze.network>
This commit is contained in:
Gaze
2024-04-22 03:24:35 +07:00
parent 877d55fcff
commit 0bb56e6ef3
4 changed files with 13 additions and 2 deletions

View File

@@ -93,4 +93,7 @@ var (
// ConflictSetting is returned when an indexer setting is conflicted
ConflictSetting = errors.NewWithDepth(depth, "conflict setting")
// Closed is returned when a resource is closed
Closed = errors.NewWithDepth(depth, "closed")
)

View File

@@ -10,6 +10,7 @@ import (
"github.com/btcsuite/btcd/rpcclient"
"github.com/btcsuite/btcd/wire"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/internal/subscription"
"github.com/gaze-network/indexer-network/pkg/logger"
@@ -129,6 +130,9 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
// send blocks to subscription channel
if err := subscription.Send(ctx, data); err != nil {
if errors.Is(err, errs.Closed) {
return
}
logger.WarnContext(ctx, "failed while dispatch block",
slogx.Error(err),
slogx.Int64("start", data[0].Header.Height),

View File

@@ -94,7 +94,7 @@ func (s *Subscription[T]) Send(ctx context.Context, value T) error {
select {
case s.in <- value:
case <-s.quitDone:
return errors.Wrap(errs.InternalError, "subscription is closed")
return errors.Wrap(errs.Closed, "subscription is closed")
case <-ctx.Done():
return errors.WithStack(ctx.Err())
}
@@ -106,7 +106,7 @@ func (s *Subscription[T]) SendError(ctx context.Context, err error) error {
select {
case s.err <- err:
case <-s.quitDone:
return errors.Wrap(errs.InternalError, "subscription is closed")
return errors.Wrap(errs.Closed, "subscription is closed")
case <-ctx.Done():
return errors.WithStack(ctx.Err())
}

View File

@@ -5,6 +5,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/common/errs"
"github.com/gaze-network/indexer-network/core/datasources"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/internal/subscription"
@@ -119,6 +120,9 @@ func (c *ClientDatabase) FetchAsync(ctx context.Context, from, to int64, ch chan
// send blocks to subscription channel
if err := subscription.Send(ctx, data); err != nil {
if errors.Is(err, errs.Closed) {
return
}
logger.WarnContext(ctx, "failed while dispatch block",
slogx.Error(err),
slogx.Int64("start", data[0].Header.Height),