Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat.check bitswap cid #49

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
with:
# list of Docker images to use as base name for tags
images: |
datapreservationprogram/retrievalbot
datapreservationprogram/spade-retrievalbot
# generate Docker tags based on the following events/attributes
tags: |
type=schedule
Expand Down
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,10 @@
/repdao_dp
/vendor
/spcoverage

#vscode
.vscode/*
*.code-workspace

#sandbox
*.sandbox.*
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# RetrievalBot
# ♠️ SpadeRetrievalBot

The goal of retrieval bot is to offer a scalable framework to perform retrieval testing over Filecoin network.

There is no centralized orchestrator to manage retrieval queue or work. Instead, it uses MongoDB to manage work queue as well as saving retrieval results.
This is a fork of the Data Programs [Retrieval Bot](https://github.com/data-preservation-programs/RetrievalBot) with customizations specific for the [Spade](https://github.com/data-preservation-programs/spade) platform.

## Workers
Workers refer to the unit that consumes the worker queue. There are 4 basic types of workers as of now.
Expand Down
6 changes: 2 additions & 4 deletions integration/spadev0/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package main
import (
"context"
"encoding/json"
"io/ioutil"
"io"
"math"
"math/rand"
"net/http"
"os"
"time"

logging "github.com/ipfs/go-log/v2"
_ "github.com/joho/godotenv/autoload"
Expand Down Expand Up @@ -118,7 +117,7 @@ func fetchActiveReplicas(ctx context.Context, url string) (*ActiveReplicas, erro

defer decompressor.Close()

data, err := ioutil.ReadAll(decompressor)
data, err := io.ReadAll(decompressor)
if err != nil {
return nil, errors.Wrap(err, "failed to read decompressed data")
}
Expand Down Expand Up @@ -167,7 +166,6 @@ func selectReplicasToTest(perProvider map[int]ProviderReplicas) map[int][]Replic
}

// Randomize which CIDs are selected
rand.Seed(time.Now().UnixNano())
indices := rand.Perm(maxReplicas)[:numCidsToTest]

for _, index := range indices {
Expand Down
5 changes: 2 additions & 3 deletions integration/spadev0/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,8 @@ func AddSpadeTasks(ctx context.Context, requester string, replicasToTest map[int
}

var spadev0Metadata map[string]string = map[string]string{
"retrieve_type": "spade",
"retrieve_size": "1048576",
// todo: specify # of cids to test per layer of the tree TBD
"retrieve_type": string(task.Spade),
"max_traverse_depth": "3",
}

func prepareTasksForSP(
Expand Down
165 changes: 164 additions & 1 deletion pkg/net/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import (
"context"
"math/rand"
"time"

"github.com/data-preservation-programs/RetrievalBot/pkg/task"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
Expand All @@ -16,7 +19,16 @@
"github.com/libp2p/go-libp2p/core/routing"
"github.com/pkg/errors"
"golang.org/x/exp/slices"
"time"

_ "github.com/ipld/go-codec-dagpb"
ipld "github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/traversal"

_ "github.com/ipld/go-ipld-prime/codec/dagcbor"
_ "github.com/ipld/go-ipld-prime/codec/dagjson"
_ "github.com/ipld/go-ipld-prime/codec/raw"
)

type SingleContentRouter struct {
Expand Down Expand Up @@ -151,3 +163,154 @@
return task.NewErrorRetrievalResultWithErrorResolution(task.RetrievalFailure, err), nil
}
}

// Starts with the root CID, then fetches a random CID from the children and grandchildren nodes,
// until it reaches `traverseDepth` or hits a node with no links.
// Note: the root CID is considered depth `0`, so passing `traverseDepth=0` will only fetch the root CID
// Returns a `SuccessfulRetrievalResult` if *all* retrievals were successful, `ErrorRetrievalResult` if any failed
func (c BitswapClient) SpadeTraversal(parent context.Context,
target peer.AddrInfo,
startingCid cid.Cid,
maxTraverseDepth uint) (*task.RetrievalResult, error) {
logger := logging.Logger("bitswap_client_spade").With("cid", startingCid).With("target", target)
cidToRetrieve := startingCid

// Initialize hosts and clients required to do all the retrieval tests
network := bsnet.NewFromIpfsHost(c.host, SingleContentRouter{
AddrInfo: target,
})
bswap := bsclient.New(parent, network, blockstore.NewBlockstore(datastore.NewMapDatastore()))

defer bswap.Close()
defer network.Stop()

startTime := time.Now()

i := uint(0)
for {
// Retrieval
logger.Infof("retrieving %s\n", cidToRetrieve.String())
blk, err := c.RetrieveBlock(parent, target, network, bswap, cidToRetrieve)

if err != nil {
return task.NewErrorRetrievalResultWithErrorResolution(task.RetrievalFailure, err), nil
}

// Verify returned content hashes to the CID we're expecting
if !blk.Cid().Equals(cidToRetrieve) {
return task.NewErrorRetrievalResult(task.CIDMismatch, errors.Errorf("retrieved cid does not match requested: %s, %s", blk.Cid().String(), cidToRetrieve)), nil

Check failure on line 201 in pkg/net/bitswap.go

View workflow job for this annotation

GitHub Actions / build

line is 161 characters (lll)
}

if i == maxTraverseDepth {
var size = int64(len(blk.RawData()))
elapsed := time.Since(startTime)
logger.With("size", size).With("elapsed", elapsed).Info("Retrieved block")

// we've reached the requested depth of the tree
return task.NewSuccessfulRetrievalResult(elapsed, size, elapsed), nil
}

// if not at bottom of the tree, keep going down the links until we reach it or hit a dead end
links, err := FindLinks(parent, blk)
if err != nil {
return task.NewErrorRetrievalResultWithErrorResolution(task.CannotDecodeLinks, err), nil
}

logger.Debugf("cid %s has %d links\n", cidToRetrieve.String(), len(links))

if len(links) == 0 {
var size = int64(len(blk.RawData()))
elapsed := time.Since(startTime)
logger.With("size", size).With("elapsed", elapsed).Info("Retrieved block")

return task.NewSuccessfulRetrievalResult(elapsed, size, elapsed), nil
}

// randomly pick a link to go down
//nolint:all we don't need crypto secured random numbers
nextIndex := rand.Intn(len(links))

cidToRetrieve, err = cid.Parse(links[nextIndex].String())
if err != nil {
return task.NewErrorRetrievalResultWithErrorResolution(task.CIDCodecNotSupported, err), nil
}

i++ // To the next layer of the tree
}
}

// Returns the raw block data, the links, and error if any
// Takes in `network` and `bswap` client, so that it can be used in a loop for traversals / multiple retrievals
func (c BitswapClient) RetrieveBlock(
parent context.Context,
target peer.AddrInfo,
network bsnet.BitSwapNetwork,
bswap *bsclient.Client,
targetCid cid.Cid) (blocks.Block, error) {
logger := logging.Logger("bitswap_retrieve_block").With("cid", targetCid).With("target", target)

notFound := make(chan struct{})
network.Start(MessageReceiver{BSClient: bswap, MessageHandler: func(
ctx context.Context, sender peer.ID, incoming bsmsg.BitSwapMessage) {
if sender == target.ID && slices.Contains(incoming.DontHaves(), targetCid) {
logger.Info("Block not found")
close(notFound)
}
}})
connectContext, cancel := context.WithTimeout(parent, c.timeout)
defer cancel()
logger.Info("Connecting to target peer...")
err := c.host.Connect(connectContext, target)
if err != nil {
return nil, errors.Wrap(err, "failed to connect to target peer")
}

resultChan := make(chan blocks.Block)
errChan := make(chan error)
go func() {
logger.Debug("Retrieving block...")
blk, err := bswap.GetBlock(connectContext, targetCid)
if err != nil {
logger.Info(err)
errChan <- err
} else {
resultChan <- blk
}
}()
select {
case <-notFound:
return nil, errors.New("DONT_HAVE received from the target peer")

case blk := <-resultChan:
return blk, nil

case err := <-errChan:
return nil, errors.Wrap(err, "error received %s")
}
}

// Attempts to decode the block data into a node and return its links
func FindLinks(ctx context.Context, blk blocks.Block) ([]datamodel.Link, error) {
if blk.Cid().Prefix().Codec == cid.Raw {
// Note: this case will happen at the bottom of the tree
return []datamodel.Link{}, nil
}

decoder, err := cidlink.DefaultLinkSystem().DecoderChooser(cidlink.Link{Cid: blk.Cid()})

if err != nil {
return nil, err
}

node, err := ipld.Decode(blk.RawData(), decoder)
if err != nil {
return nil, err
}

links, err := traversal.SelectLinks(node)
if err != nil {
return nil, err
}

return links, nil
}
6 changes: 5 additions & 1 deletion pkg/task/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package task
import (
"context"
"errors"
"github.com/data-preservation-programs/RetrievalBot/pkg/requesterror"
"strings"

"github.com/data-preservation-programs/RetrievalBot/pkg/requesterror"
)

type ErrorCode string
Expand All @@ -26,8 +27,11 @@ const (
NotOnline ErrorCode = "not_online"
UnconfirmedBlockTransfer ErrorCode = "unconfirmed_block_transfer"
CIDCodecNotSupported ErrorCode = "cid_codec_not_supported"
CIDMismatch ErrorCode = "cid_mismatch"
ResponseRejected ErrorCode = "response_rejected"
DealStateMissing ErrorCode = "deal_state_missing"
CannotDecodeLinks ErrorCode = "cannot_decode_links"
CannotTraverse ErrorCode = "cannot_traverse"
)

var errorStringMap = map[string]ErrorCode{
Expand Down
10 changes: 9 additions & 1 deletion pkg/task/task.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package task

import (
"time"

"github.com/data-preservation-programs/RetrievalBot/pkg/convert"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"time"
)

type Provider struct {
Expand Down Expand Up @@ -44,6 +45,13 @@ const (
Bitswap ModuleName = "bitswap"
)

type RetrieveType string

const (
Any RetrieveType = "any"
Spade RetrieveType = "spade"
)

type Content struct {
CID string `bson:"cid"`
}
Expand Down
16 changes: 16 additions & 0 deletions worker/bitswap/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package bitswap

import (
"context"
"strconv"

"github.com/data-preservation-programs/RetrievalBot/pkg/convert"
"github.com/data-preservation-programs/RetrievalBot/pkg/model"
"github.com/data-preservation-programs/RetrievalBot/pkg/net"
Expand Down Expand Up @@ -83,6 +85,20 @@ func (e Worker) DoWork(tsk task.Task) (*task.RetrievalResult, error) {
return task.NewErrorRetrievalResult(task.ProtocolNotSupported, errors.New("No bitswap multiaddr available")), nil
}

if tsk.Metadata["retrieve_type"] == string(task.Spade) {
depth, err := strconv.ParseUint(tsk.Metadata["max_traverse_depth"], 10, 32)

if err != nil {
return nil, errors.Wrap(err, "failed to parse max_traverse_depth")
}

return client.SpadeTraversal(ctx, peer.AddrInfo{
ID: peerID,
Addrs: addrs,
}, contentCID, uint(depth),
)
}

//nolint:wrapcheck
return client.Retrieve(ctx, peer.AddrInfo{
ID: peerID,
Expand Down
Loading