feat(btc): add Fetch Async impl

This commit is contained in:
Gaze
2024-04-14 03:37:43 +07:00
parent 0607abba1f
commit 45e87601a6
3 changed files with 106 additions and 2 deletions

View File

@@ -6,6 +6,10 @@ import (
"github.com/btcsuite/btcd/rpcclient"
"github.com/cockroachdb/errors"
"github.com/gaze-network/indexer-network/core/types"
"github.com/gaze-network/indexer-network/pkg/logger"
"github.com/gaze-network/indexer-network/pkg/logger/slogx"
cstream "github.com/planxnx/concurrent-stream"
"github.com/samber/lo"
)
// Make sure to implement the BitcoinDatasource interface
@@ -57,7 +61,7 @@ func (d *BitcoinNodeDatasource) Fetch(ctx context.Context, from, to int64) ([]*t
// - from: block height to start fetching, if -1, it will start from genesis block
// - to: block height to stop fetching, if -1, it will fetch until the latest block
func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64, ch chan<- []*types.Block) (*ClientSubscription[[]*types.Block], error) {
_, _, skip, err := d.prepareRange(from, to)
from, to, skip, err := d.prepareRange(from, to)
if err != nil {
return nil, errors.Wrap(err, "failed to prepare fetch range")
}
@@ -67,9 +71,102 @@ func (d *BitcoinNodeDatasource) FetchAsync(ctx context.Context, from, to int64,
if err := subscription.UnsubscribeWithContext(ctx); err != nil {
return nil, errors.Wrap(err, "failed to unsubscribe")
}
return subscription, nil
}
// TODO: async fetching
// Create parallel stream
out := make(chan []*types.Block)
stream := cstream.NewStream(ctx, 8, out)
// create slice of block height to fetch
blockHeights := make([]int64, 0, to-from+1)
for i := from; i <= to; i++ {
blockHeights = append(blockHeights, i)
}
// Close stream when subscription is done or context is canceled
go func() {
defer stream.Close()
done := subscription.Done()
select {
case <-done:
case <-ctx.Done():
}
}()
// Fan-out blocks to subscription channel
go func() {
defer subscription.Unsubscribe()
for {
select {
case data, ok := <-out:
// stream closed
if !ok {
return
}
// empty blocks
if len(data) == 0 {
continue
}
// send blocks to subscription channel
if err := subscription.send(ctx, data); err != nil {
logger.ErrorContext(ctx, "failed while dispatch block", err,
slogx.Int64("start", data[0].Header.Height),
slogx.Int64("end", data[len(data)-1].Header.Height),
)
}
case <-ctx.Done():
return
}
}
}()
// Wait for stream to finish and close out channel
go func() {
_ = stream.Wait()
close(out)
}()
// Parallel fetch blocks from Bitcoin node until complete all block heights
// or subscription is done.
go func() {
done := subscription.Done()
chunks := lo.Chunk(blockHeights, 100)
for _, chunk := range chunks {
chunk := chunk
select {
case <-done:
return
default:
stream.Go(func() []*types.Block {
// TODO: should concurrent fetch block or not ?
blocks := make([]*types.Block, 0, len(chunk))
for _, height := range chunk {
hash, err := d.btcclient.GetBlockHash(height)
if err != nil {
logger.ErrorContext(ctx, "failed to get block hash", err, slogx.Int64("height", height))
if err := subscription.sendError(ctx, errors.Wrapf(err, "failed to get block hash: height: %d", height)); err != nil {
logger.ErrorContext(ctx, "failed to send error", err)
}
}
block, err := d.btcclient.GetBlock(hash)
if err != nil {
logger.ErrorContext(ctx, "failed to get block", err, slogx.Int64("height", height))
if err := subscription.sendError(ctx, errors.Wrapf(err, "failed to get block: height: %d, hash: %s", height, hash)); err != nil {
logger.ErrorContext(ctx, "failed to send error", err)
}
}
blocks = append(blocks, types.ParseMsgBlock(block, height))
}
return blocks
})
}
}
}()
return subscription, nil
}

1
go.mod
View File

@@ -34,6 +34,7 @@ require (
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/planxnx/concurrent-stream v0.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
golang.org/x/crypto v0.17.0 // indirect

6
go.sum
View File

@@ -115,6 +115,12 @@ github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTw
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/planxnx/concurrent-stream v0.0.0-20240413192746-31d82be9e906 h1:2Mj9pI+AQyMlN7aLasm2Q66S+vMsKET2y6y+BUH6w6Y=
github.com/planxnx/concurrent-stream v0.0.0-20240413192746-31d82be9e906/go.mod h1:vxnW2qxkCLppMo5+Zns3b5/CiVxYQjXRLVFGJ9xvkXk=
github.com/planxnx/concurrent-stream v0.1.0 h1:IV/Z3xPuhUxsm05wDWBY2TtybMOx+HvQi3kkCZskKDI=
github.com/planxnx/concurrent-stream v0.1.0/go.mod h1:vxnW2qxkCLppMo5+Zns3b5/CiVxYQjXRLVFGJ9xvkXk=
github.com/planxnx/concurrent-stream v0.1.1 h1:U4XEYWsfqYkTXdLXOuoijHDnGHznX45jnJ3M58c544s=
github.com/planxnx/concurrent-stream v0.1.1/go.mod h1:vxnW2qxkCLppMo5+Zns3b5/CiVxYQjXRLVFGJ9xvkXk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=