Skip to content

Commit

Permalink
Merge pull request #4 from vulcanize/ian/tests_for_upstream
Browse files Browse the repository at this point in the history
GetMany blockstore method + tests for upstream
  • Loading branch information
i-norden authored Oct 12, 2023
2 parents 6602207 + e5b1548 commit 05ad44f
Show file tree
Hide file tree
Showing 7 changed files with 436 additions and 623 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ The following emojis are used to highlight certain changes:
## [Unreleased]

### Added
* [GetMany blockstore implementation](https://github.com/vulcanize/boxo/pull/1)
* Requires https://github.com/vulcanize/go-datastore/releases/tag/v0.6.1-internal

* `boxo/gateway`:
* A new `WithResolver(...)` option can be used with `NewBlocksBackend(...)` allowing the user to pass their custom `Resolver` implementation.
Expand Down
228 changes: 228 additions & 0 deletions blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package blockstore
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -64,6 +65,12 @@ type Blockstore interface {
HashOnRead(enabled bool)
}

// GetManyBlockstore is a blockstore interface that supports a GetMany method
type GetManyBlockstore interface {
Blockstore
GetMany(context.Context, []cid.Cid) ([]blocks.Block, []cid.Cid, error)
}

// Viewer can be implemented by blockstores that offer zero-copy access to
// values.
//
Expand Down Expand Up @@ -310,6 +317,227 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return output, nil
}

// GetManyOption is a getManyBlockStore option implementation
type GetManyOption struct {
f func(bs *getManyBlockStore)
}

// NewGetManyBlockstore returns a default GetManyBlockstore implementation
// using the provided datastore.TxnDatastore backend.
func NewGetManyBlockstore(d ds.TxnDatastore, opts ...GetManyOption) GetManyBlockstore {
bs := &getManyBlockStore{
datastore: d,
}

for _, o := range opts {
o.f(bs)
}

if !bs.noPrefix {
bs.datastore = dsns.WrapTxnDatastore(bs.datastore, BlockPrefix)
}
return bs
}

type getManyBlockStore struct {
datastore ds.TxnDatastore

rehash atomic.Bool
writeThrough bool
noPrefix bool
}

func (bs *getManyBlockStore) HashOnRead(enabled bool) {
bs.rehash.Store(enabled)
}

func (bs *getManyBlockStore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
if !k.Defined() {
logger.Error("undefined cid in blockstore")
return nil, ipld.ErrNotFound{Cid: k}
}
bdata, err := bs.datastore.Get(ctx, dshelp.MultihashToDsKey(k.Hash()))
if err == ds.ErrNotFound {
return nil, ipld.ErrNotFound{Cid: k}
}
if err != nil {
return nil, err
}
if bs.rehash.Load() {
rbcid, err := k.Prefix().Sum(bdata)
if err != nil {
return nil, err
}

if !rbcid.Equals(k) {
return nil, ErrHashMismatch
}

return blocks.NewBlockWithCid(bdata, rbcid)
}
return blocks.NewBlockWithCid(bdata, k)
}

func (bs *getManyBlockStore) GetMany(ctx context.Context, cs []cid.Cid) ([]blocks.Block, []cid.Cid, error) {
if len(cs) == 1 {
// performance fast-path
block, err := bs.Get(ctx, cs[0])
return []blocks.Block{block}, nil, err
}

t, err := bs.datastore.NewTransaction(ctx, false)
if err != nil {
return nil, nil, err
}
blks := make([]blocks.Block, 0, len(cs))
missingCIDs := make([]cid.Cid, 0, len(cs))
for _, c := range cs {
if !c.Defined() {
logger.Error("undefined cid in blockstore")
return nil, nil, ipld.ErrNotFound{Cid: c}
}
bdata, err := t.Get(ctx, dshelp.MultihashToDsKey(c.Hash()))
if err != nil {
if err == ds.ErrNotFound {
missingCIDs = append(missingCIDs, c)
} else {
return nil, nil, err
}
} else {
if bs.rehash.Load() {
rbcid, err := c.Prefix().Sum(bdata)
if err != nil {
return nil, nil, err
}

if !rbcid.Equals(c) {
return nil, nil, fmt.Errorf("block in storage has different hash (%x) than requested (%x)", rbcid.Hash(), c.Hash())
}

blk, err := blocks.NewBlockWithCid(bdata, rbcid)
if err != nil {
return nil, nil, err
}

blks = append(blks, blk)
} else {
blk, err := blocks.NewBlockWithCid(bdata, c)
if err != nil {
return nil, nil, err
}

blks = append(blks, blk)
}
}
}
return blks, missingCIDs, t.Commit(ctx)
}

func (bs *getManyBlockStore) Put(ctx context.Context, block blocks.Block) error {
k := dshelp.MultihashToDsKey(block.Cid().Hash())

// Has is cheaper than Put, so see if we already have it
if !bs.writeThrough {
exists, err := bs.datastore.Has(ctx, k)
if err == nil && exists {
return nil // already stored.
}
}
return bs.datastore.Put(ctx, k, block.RawData())
}

func (bs *getManyBlockStore) PutMany(ctx context.Context, blocks []blocks.Block) error {
if len(blocks) == 1 {
// performance fast-path
return bs.Put(ctx, blocks[0])
}

t, err := bs.datastore.NewTransaction(ctx, false)
if err != nil {
return err
}
for _, b := range blocks {
k := dshelp.MultihashToDsKey(b.Cid().Hash())

if !bs.writeThrough {
exists, err := bs.datastore.Has(ctx, k)
if err == nil && exists {
continue
}
}

err = t.Put(ctx, k, b.RawData())
if err != nil {
return err
}
}
return t.Commit(ctx)
}

func (bs *getManyBlockStore) Has(ctx context.Context, k cid.Cid) (bool, error) {
return bs.datastore.Has(ctx, dshelp.MultihashToDsKey(k.Hash()))
}

func (bs *getManyBlockStore) GetSize(ctx context.Context, k cid.Cid) (int, error) {
size, err := bs.datastore.GetSize(ctx, dshelp.MultihashToDsKey(k.Hash()))
if err == ds.ErrNotFound {
return -1, ipld.ErrNotFound{Cid: k}
}
return size, err
}

func (bs *getManyBlockStore) DeleteBlock(ctx context.Context, k cid.Cid) error {
return bs.datastore.Delete(ctx, dshelp.MultihashToDsKey(k.Hash()))
}

// AllKeysChan runs a query for keys from the blockstore.
// this is very simplistic, in the future, take dsq.Query as a param?
//
// AllKeysChan respects context.
func (bs *getManyBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {

// KeysOnly, because that would be _a lot_ of data.
q := dsq.Query{KeysOnly: true}
res, err := bs.datastore.Query(ctx, q)
if err != nil {
return nil, err
}

output := make(chan cid.Cid, dsq.KeysOnlyBufSize)
go func() {
defer func() {
res.Close() // ensure exit (signals early exit, too)
close(output)
}()

for {
e, ok := res.NextSync()
if !ok {
return
}
if e.Error != nil {
logger.Errorf("blockstore.AllKeysChan got err: %s", e.Error)
return
}

// need to convert to key.Key using key.KeyFromDsKey.
bk, err := dshelp.BinaryFromDsKey(ds.RawKey(e.Key))
if err != nil {
logger.Warnf("error parsing key from binary: %s", err)
continue
}
k := cid.NewCidV1(cid.Raw, bk)
select {
case <-ctx.Done():
return
case output <- k:
}
}
}()

return output, nil
}

// NewGCLocker returns a default implementation of
// GCLocker using standard [RW] mutexes.
func NewGCLocker() GCLocker {
Expand Down
104 changes: 104 additions & 0 deletions blockstore/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"testing"

dstest "github.com/ipfs/go-datastore/test"

u "github.com/ipfs/boxo/util"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -72,6 +74,108 @@ func TestCidv0v1(t *testing.T) {
}
}

func TestGetManyWhenKeyNotPresent(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
c1 := cid.NewCidV0(u.Hash([]byte("stuff")))
c2 := cid.NewCidV0(u.Hash([]byte("stuff2")))

blks, missingCIDs, err := bs.GetMany(bg, []cid.Cid{c1, c2})

if len(blks) != 0 {
t.Error("no blocks expected")
}
if len(missingCIDs) != 2 {
t.Error("2 missing cids expected")
}
if err != nil {
t.Error("no error expected")
}
}

func TestGetManyWhenKeyIsNil(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
_, _, err := bs.GetMany(bg, []cid.Cid{{}, {}})
if !ipld.IsNotFound(err) {
t.Fail()
}
}

func TestPutsThenGetManyBlock(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
block1 := blocks.NewBlock([]byte("some data1"))
block2 := blocks.NewBlock([]byte("some data2"))
block3 := blocks.NewBlock([]byte("some data3"))
block4 := blocks.NewBlock([]byte("some data4"))

err := bs.PutMany(bg, []blocks.Block{block1, block2, block4})
if err != nil {
t.Fatal(err)
}

blocksFromBlockstore, missingCIDs, err := bs.GetMany(bg, []cid.Cid{block1.Cid(), block2.Cid(), block3.Cid(), block4.Cid()})
if err != nil {
t.Fatal(err)
}
if len(blocksFromBlockstore) != 3 {
t.Fatal("unexpected number of blocks")
}
if len(missingCIDs) != 1 {
t.Fatal("unexpected number of missing CIDs")
}
if !bytes.Equal(blocksFromBlockstore[0].RawData(), block1.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[1].RawData(), block2.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[2].RawData(), block4.RawData()) {
t.Fail()
}
if !bytes.Equal(missingCIDs[0].Bytes(), block3.Cid().Bytes()) {
t.Fail()
}
}

func TestCidv0v1Many(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
block1 := blocks.NewBlock([]byte("some data1"))
block2 := blocks.NewBlock([]byte("some data2"))
block3 := blocks.NewBlock([]byte("some data3"))
block4 := blocks.NewBlock([]byte("some data4"))

err := bs.PutMany(bg, []blocks.Block{block1, block2, block4})
if err != nil {
t.Fatal(err)
}

blocksFromBlockstore, missingCIDs, err := bs.GetMany(bg,
[]cid.Cid{cid.NewCidV1(cid.DagProtobuf, block1.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block2.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block3.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block4.Cid().Hash())})
if err != nil {
t.Fatal(err)
}
if len(blocksFromBlockstore) != 3 {
t.Fatal("unexpected number of blocks")
}
if len(missingCIDs) != 1 {
t.Fatal("unexpected number of missing CIDs")
}
if !bytes.Equal(blocksFromBlockstore[0].RawData(), block1.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[1].RawData(), block2.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[2].RawData(), block4.RawData()) {
t.Fail()
}
if !bytes.Equal(missingCIDs[0].Bytes(), cid.NewCidV1(cid.DagProtobuf, block3.Cid().Hash()).Bytes()) {
t.Fail()
}
}

func TestPutThenGetSizeBlock(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
block := blocks.NewBlock([]byte("some data"))
Expand Down
Loading

0 comments on commit 05ad44f

Please sign in to comment.