Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GetMany blockstore method + tests for upstream #4

Merged
merged 9 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ The following emojis are used to highlight certain changes:
## [Unreleased]

### Added
* [GetMany blockstore implementation](https://github.com/vulcanize/boxo/pull/1)
* Requires https://github.com/vulcanize/go-datastore/releases/tag/v0.6.1-internal

* `boxo/gateway`:
* A new `WithResolver(...)` option can be used with `NewBlocksBackend(...)` allowing the user to pass their custom `Resolver` implementation.
Expand Down
228 changes: 228 additions & 0 deletions blockstore/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package blockstore
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -64,6 +65,12 @@ type Blockstore interface {
HashOnRead(enabled bool)
}

// GetManyBlockstore is a blockstore interface that supports a GetMany method
type GetManyBlockstore interface {
Blockstore
GetMany(context.Context, []cid.Cid) ([]blocks.Block, []cid.Cid, error)
}

// Viewer can be implemented by blockstores that offer zero-copy access to
// values.
//
Expand Down Expand Up @@ -310,6 +317,227 @@ func (bs *blockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return output, nil
}

// GetManyOption is a getManyBlockStore option implementation
type GetManyOption struct {
f func(bs *getManyBlockStore)
}

// NewGetManyBlockstore returns a default GetManyBlockstore implementation
// using the provided datastore.TxnDatastore backend.
func NewGetManyBlockstore(d ds.TxnDatastore, opts ...GetManyOption) GetManyBlockstore {
bs := &getManyBlockStore{
datastore: d,
}

for _, o := range opts {
o.f(bs)
}

if !bs.noPrefix {
bs.datastore = dsns.WrapTxnDatastore(bs.datastore, BlockPrefix)
}
return bs
}

type getManyBlockStore struct {
datastore ds.TxnDatastore

rehash atomic.Bool
writeThrough bool
noPrefix bool
}

func (bs *getManyBlockStore) HashOnRead(enabled bool) {
bs.rehash.Store(enabled)
}

func (bs *getManyBlockStore) Get(ctx context.Context, k cid.Cid) (blocks.Block, error) {
if !k.Defined() {
logger.Error("undefined cid in blockstore")
return nil, ipld.ErrNotFound{Cid: k}
}
bdata, err := bs.datastore.Get(ctx, dshelp.MultihashToDsKey(k.Hash()))
if err == ds.ErrNotFound {
return nil, ipld.ErrNotFound{Cid: k}
}
if err != nil {
return nil, err
}
if bs.rehash.Load() {
rbcid, err := k.Prefix().Sum(bdata)
if err != nil {
return nil, err
}

if !rbcid.Equals(k) {
return nil, ErrHashMismatch
}

return blocks.NewBlockWithCid(bdata, rbcid)
}
return blocks.NewBlockWithCid(bdata, k)
}

func (bs *getManyBlockStore) GetMany(ctx context.Context, cs []cid.Cid) ([]blocks.Block, []cid.Cid, error) {
if len(cs) == 1 {
// performance fast-path
block, err := bs.Get(ctx, cs[0])
return []blocks.Block{block}, nil, err
}

t, err := bs.datastore.NewTransaction(ctx, false)
if err != nil {
return nil, nil, err
}
blks := make([]blocks.Block, 0, len(cs))
missingCIDs := make([]cid.Cid, 0, len(cs))
for _, c := range cs {
if !c.Defined() {
logger.Error("undefined cid in blockstore")
return nil, nil, ipld.ErrNotFound{Cid: c}
}
bdata, err := t.Get(ctx, dshelp.MultihashToDsKey(c.Hash()))
if err != nil {
if err == ds.ErrNotFound {
missingCIDs = append(missingCIDs, c)
} else {
return nil, nil, err
}
} else {
if bs.rehash.Load() {
rbcid, err := c.Prefix().Sum(bdata)
if err != nil {
return nil, nil, err
}

if !rbcid.Equals(c) {
return nil, nil, fmt.Errorf("block in storage has different hash (%x) than requested (%x)", rbcid.Hash(), c.Hash())
}

blk, err := blocks.NewBlockWithCid(bdata, rbcid)
if err != nil {
return nil, nil, err
}

blks = append(blks, blk)
} else {
blk, err := blocks.NewBlockWithCid(bdata, c)
if err != nil {
return nil, nil, err
}

blks = append(blks, blk)
}
}
}
return blks, missingCIDs, t.Commit(ctx)
}

func (bs *getManyBlockStore) Put(ctx context.Context, block blocks.Block) error {
k := dshelp.MultihashToDsKey(block.Cid().Hash())

// Has is cheaper than Put, so see if we already have it
if !bs.writeThrough {
exists, err := bs.datastore.Has(ctx, k)
if err == nil && exists {
return nil // already stored.
}
}
return bs.datastore.Put(ctx, k, block.RawData())
}

func (bs *getManyBlockStore) PutMany(ctx context.Context, blocks []blocks.Block) error {
if len(blocks) == 1 {
// performance fast-path
return bs.Put(ctx, blocks[0])
}

t, err := bs.datastore.NewTransaction(ctx, false)
if err != nil {
return err
}
for _, b := range blocks {
k := dshelp.MultihashToDsKey(b.Cid().Hash())

if !bs.writeThrough {
exists, err := bs.datastore.Has(ctx, k)
if err == nil && exists {
continue
}
}

err = t.Put(ctx, k, b.RawData())
if err != nil {
return err
}
}
return t.Commit(ctx)
}

func (bs *getManyBlockStore) Has(ctx context.Context, k cid.Cid) (bool, error) {
return bs.datastore.Has(ctx, dshelp.MultihashToDsKey(k.Hash()))
}

func (bs *getManyBlockStore) GetSize(ctx context.Context, k cid.Cid) (int, error) {
size, err := bs.datastore.GetSize(ctx, dshelp.MultihashToDsKey(k.Hash()))
if err == ds.ErrNotFound {
return -1, ipld.ErrNotFound{Cid: k}
}
return size, err
}

func (bs *getManyBlockStore) DeleteBlock(ctx context.Context, k cid.Cid) error {
return bs.datastore.Delete(ctx, dshelp.MultihashToDsKey(k.Hash()))
}

// AllKeysChan runs a query for keys from the blockstore.
// this is very simplistic, in the future, take dsq.Query as a param?
//
// AllKeysChan respects context.
func (bs *getManyBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {

// KeysOnly, because that would be _a lot_ of data.
q := dsq.Query{KeysOnly: true}
res, err := bs.datastore.Query(ctx, q)
if err != nil {
return nil, err
}

output := make(chan cid.Cid, dsq.KeysOnlyBufSize)
go func() {
defer func() {
res.Close() // ensure exit (signals early exit, too)
close(output)
}()

for {
e, ok := res.NextSync()
if !ok {
return
}
if e.Error != nil {
logger.Errorf("blockstore.AllKeysChan got err: %s", e.Error)
return
}

// need to convert to key.Key using key.KeyFromDsKey.
bk, err := dshelp.BinaryFromDsKey(ds.RawKey(e.Key))
if err != nil {
logger.Warnf("error parsing key from binary: %s", err)
continue
}
k := cid.NewCidV1(cid.Raw, bk)
select {
case <-ctx.Done():
return
case output <- k:
}
}
}()

return output, nil
}

// NewGCLocker returns a default implementation of
// GCLocker using standard [RW] mutexes.
func NewGCLocker() GCLocker {
Expand Down
104 changes: 104 additions & 0 deletions blockstore/blockstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"testing"

dstest "github.com/ipfs/go-datastore/test"

u "github.com/ipfs/boxo/util"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -72,6 +74,108 @@ func TestCidv0v1(t *testing.T) {
}
}

func TestGetManyWhenKeyNotPresent(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
c1 := cid.NewCidV0(u.Hash([]byte("stuff")))
c2 := cid.NewCidV0(u.Hash([]byte("stuff2")))

blks, missingCIDs, err := bs.GetMany(bg, []cid.Cid{c1, c2})

if len(blks) != 0 {
t.Error("no blocks expected")
}
if len(missingCIDs) != 2 {
t.Error("2 missing cids expected")
}
if err != nil {
t.Error("no error expected")
}
}

func TestGetManyWhenKeyIsNil(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
_, _, err := bs.GetMany(bg, []cid.Cid{{}, {}})
if !ipld.IsNotFound(err) {
t.Fail()
}
}

func TestPutsThenGetManyBlock(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
block1 := blocks.NewBlock([]byte("some data1"))
block2 := blocks.NewBlock([]byte("some data2"))
block3 := blocks.NewBlock([]byte("some data3"))
block4 := blocks.NewBlock([]byte("some data4"))

err := bs.PutMany(bg, []blocks.Block{block1, block2, block4})
if err != nil {
t.Fatal(err)
}

blocksFromBlockstore, missingCIDs, err := bs.GetMany(bg, []cid.Cid{block1.Cid(), block2.Cid(), block3.Cid(), block4.Cid()})
if err != nil {
t.Fatal(err)
}
if len(blocksFromBlockstore) != 3 {
t.Fatal("unexpected number of blocks")
}
if len(missingCIDs) != 1 {
t.Fatal("unexpected number of missing CIDs")
}
if !bytes.Equal(blocksFromBlockstore[0].RawData(), block1.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[1].RawData(), block2.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[2].RawData(), block4.RawData()) {
t.Fail()
}
if !bytes.Equal(missingCIDs[0].Bytes(), block3.Cid().Bytes()) {
t.Fail()
}
}

func TestCidv0v1Many(t *testing.T) {
bs := NewGetManyBlockstore(dstest.NewTestTxnDatastore(ds.NewMapDatastore(), false))
block1 := blocks.NewBlock([]byte("some data1"))
block2 := blocks.NewBlock([]byte("some data2"))
block3 := blocks.NewBlock([]byte("some data3"))
block4 := blocks.NewBlock([]byte("some data4"))

err := bs.PutMany(bg, []blocks.Block{block1, block2, block4})
if err != nil {
t.Fatal(err)
}

blocksFromBlockstore, missingCIDs, err := bs.GetMany(bg,
[]cid.Cid{cid.NewCidV1(cid.DagProtobuf, block1.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block2.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block3.Cid().Hash()),
cid.NewCidV1(cid.DagProtobuf, block4.Cid().Hash())})
if err != nil {
t.Fatal(err)
}
if len(blocksFromBlockstore) != 3 {
t.Fatal("unexpected number of blocks")
}
if len(missingCIDs) != 1 {
t.Fatal("unexpected number of missing CIDs")
}
if !bytes.Equal(blocksFromBlockstore[0].RawData(), block1.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[1].RawData(), block2.RawData()) {
t.Fail()
}
if !bytes.Equal(blocksFromBlockstore[2].RawData(), block4.RawData()) {
t.Fail()
}
if !bytes.Equal(missingCIDs[0].Bytes(), cid.NewCidV1(cid.DagProtobuf, block3.Cid().Hash()).Bytes()) {
t.Fail()
}
}

func TestPutThenGetSizeBlock(t *testing.T) {
bs := NewBlockstore(ds_sync.MutexWrap(ds.NewMapDatastore()))
block := blocks.NewBlock([]byte("some data"))
Expand Down
Loading