diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 16c09ab2b0..353be00f8b 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -13,7 +13,6 @@ import ( "github.com/ipfs/boxo/blockstore" "github.com/ipfs/boxo/exchange" - "github.com/ipfs/boxo/provider" "github.com/ipfs/boxo/verifcid" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" @@ -74,21 +73,10 @@ type BoundedBlockService interface { var _ BoundedBlockService = (*blockService)(nil) -// ProvidingBlockService is a Blockservice which provides new blocks to a provider. -type ProvidingBlockService interface { - BlockService - - // Provider can return nil, then no provider is used. - Provider() provider.Provider -} - -var _ ProvidingBlockService = (*blockService)(nil) - type blockService struct { allowlist verifcid.Allowlist blockstore blockstore.Blockstore exchange exchange.Interface - provider provider.Provider // If checkFirst is true then first check that a block doesn't // already exist to avoid republishing the block on the exchange. checkFirst bool @@ -111,13 +99,6 @@ func WithAllowlist(allowlist verifcid.Allowlist) Option { } } -// WithProvider allows to advertise anything that is added through the blockservice. -func WithProvider(prov provider.Provider) Option { - return func(bs *blockService) { - bs.provider = prov - } -} - // New creates a BlockService with given datastore instance. func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) BlockService { if exchange == nil { @@ -140,11 +121,6 @@ func New(bs blockstore.Blockstore, exchange exchange.Interface, opts ...Option) // Blockstore returns the blockstore behind this blockservice. func (s *blockService) Blockstore() blockstore.Blockstore { - if s.provider != nil { - // FIXME: this is a hack remove once ipfs/boxo#567 is solved. - return providingBlockstore{s.blockstore, s.provider} - } - return s.blockstore } @@ -157,13 +133,23 @@ func (s *blockService) Allowlist() verifcid.Allowlist { return s.allowlist } -func (s *blockService) Provider() provider.Provider { - return s.provider +// NewSession creates a new session that allows for +// controlled exchange of wantlists to decrease the bandwidth overhead. +// If the current exchange is a SessionExchange, a new exchange +// session will be created. Otherwise, the current exchange will be used +// directly. +// Sessions are lazily setup, this is cheap. +func NewSession(ctx context.Context, bs BlockService) *Session { + ses := grabSessionFromContext(ctx, bs) + if ses != nil { + return ses + } + + return newSession(ctx, bs) } -// NewSession creates a new session that allows for controlled exchange of -// wantlists to decrease the bandwidth overhead. -func NewSession(ctx context.Context, bs BlockService) *Session { +// newSession is like [NewSession] but it does not attempt to reuse session from the existing context. +func newSession(ctx context.Context, bs BlockService) *Session { return &Session{bs: bs, sesctx: ctx} } @@ -183,7 +169,7 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { } } - if err = s.blockstore.Put(ctx, o); err != nil { + if err := s.blockstore.Put(ctx, o); err != nil { return err } @@ -194,11 +180,6 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error { logger.Errorf("NotifyNewBlocks: %s", err.Error()) } } - if s.provider != nil { - if err := s.provider.Provide(ctx, o.Cid(), true); err != nil { - logger.Errorf("Provide: %s", err.Error()) - } - } return nil } @@ -245,19 +226,16 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error { logger.Errorf("NotifyNewBlocks: %s", err.Error()) } } - if s.provider != nil { - for _, o := range toput { - if err := s.provider.Provide(ctx, o.Cid(), true); err != nil { - logger.Errorf("Provide: %s", err.Error()) - } - } - } return nil } // GetBlock retrieves a particular block from the service, // Getting it from the datastore using the key (hash). func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) { + if ses := grabSessionFromContext(ctx, s); ses != nil { + return ses.GetBlock(ctx, c) + } + ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c))) defer span.End() @@ -275,7 +253,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func return nil, err } - provider, blockstore := grabProviderAndBlockstoreFromBlockservice(bs) + blockstore := bs.Blockstore() block, err := blockstore.Get(ctx, c) switch { @@ -309,12 +287,6 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func return nil, err } } - if provider != nil { - err = provider.Provide(ctx, blk.Cid(), true) - if err != nil { - return nil, err - } - } logger.Debugf("BlockService.BlockFetched %s", c) return blk, nil } @@ -323,6 +295,10 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func // the returned channel. // NB: No guarantees are made about order. func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block { + if ses := grabSessionFromContext(ctx, s); ses != nil { + return ses.GetBlocks(ctx, ks) + } + ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks") defer span.End() @@ -360,7 +336,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet ks = ks2 } - provider, bs := grabProviderAndBlockstoreFromBlockservice(blockservice) + bs := blockservice.Blockstore() var misses []cid.Cid for _, c := range ks { @@ -419,14 +395,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet cache[0] = nil // early gc } - if provider != nil { - err = provider.Provide(ctx, b.Cid(), true) - if err != nil { - logger.Errorf("could not tell the provider about new blocks: %s", err) - return - } - } - select { case out <- b: case <-ctx.Done(): @@ -506,6 +474,43 @@ func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Blo var _ BlockGetter = (*Session)(nil) +// ContextWithSession is a helper which creates a context with an embded session, +// future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService] +// will be redirected to this same session instead. +// Sessions are lazily setup, this is cheap. +// It wont make a new session if one exists already in the context. +func ContextWithSession(ctx context.Context, bs BlockService) context.Context { + if grabSessionFromContext(ctx, bs) != nil { + return ctx + } + return EmbedSessionInContext(ctx, newSession(ctx, bs)) +} + +// EmbedSessionInContext is like [ContextWithSession] but it allows to embed an existing session. +func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context { + // use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice. + return context.WithValue(ctx, ses.bs, ses) +} + +// grabSessionFromContext returns nil if the session was not found +// This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety, +// if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app. +// By having this private we allow consumers to follow the trace of where the blockservice is passed and used. +func grabSessionFromContext(ctx context.Context, bs BlockService) *Session { + s := ctx.Value(bs) + if s == nil { + return nil + } + + ss, ok := s.(*Session) + if !ok { + // idk what to do here, that kinda sucks, giveup + return nil + } + + return ss +} + // grabAllowlistFromBlockservice never returns nil func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist { if bbs, ok := bs.(BoundedBlockService); ok { @@ -513,14 +518,3 @@ func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist { } return verifcid.DefaultAllowlist } - -// grabProviderAndBlockstoreFromBlockservice can return nil if no provider is used. -func grabProviderAndBlockstoreFromBlockservice(bs BlockService) (provider.Provider, blockstore.Blockstore) { - if bbs, ok := bs.(*blockService); ok { - return bbs.provider, bbs.blockstore - } - if bbs, ok := bs.(ProvidingBlockService); ok { - return bbs.Provider(), bbs.Blockstore() - } - return nil, bs.Blockstore() -} diff --git a/blockservice/blockservice_test.go b/blockservice/blockservice_test.go index 14fed0a17f..29350ff37c 100644 --- a/blockservice/blockservice_test.go +++ b/blockservice/blockservice_test.go @@ -288,102 +288,67 @@ func TestAllowlist(t *testing.T) { check(NewSession(ctx, blockservice).GetBlock) } -type wrappedBlockservice struct { - BlockService +type fakeIsNewSessionCreateExchange struct { + ses exchange.Fetcher + newSessionWasCalled bool } -type mockProvider []cid.Cid +var _ exchange.SessionExchange = (*fakeIsNewSessionCreateExchange)(nil) -func (p *mockProvider) Provide(ctx context.Context, c cid.Cid, announce bool) error { - *p = append(*p, c) +func (*fakeIsNewSessionCreateExchange) Close() error { return nil } -func TestProviding(t *testing.T) { - t.Parallel() - a := assert.New(t) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +func (*fakeIsNewSessionCreateExchange) GetBlock(context.Context, cid.Cid) (blocks.Block, error) { + panic("should call on the session") +} - blocks := random.BlocksOfSize(12, blockSize) +func (*fakeIsNewSessionCreateExchange) GetBlocks(context.Context, []cid.Cid) (<-chan blocks.Block, error) { + panic("should call on the session") +} - exchange := blockstore.NewBlockstore(ds.NewMapDatastore()) +func (f *fakeIsNewSessionCreateExchange) NewSession(context.Context) exchange.Fetcher { + f.newSessionWasCalled = true + return f.ses +} - prov := mockProvider{} - blockservice := New(blockstore.NewBlockstore(ds.NewMapDatastore()), offline.Exchange(exchange), WithProvider(&prov)) - var added []cid.Cid +func (*fakeIsNewSessionCreateExchange) NotifyNewBlocks(context.Context, ...blocks.Block) error { + return nil +} - // Adding one block provide it. - a.NoError(blockservice.AddBlock(ctx, blocks[0])) - added = append(added, blocks[0].Cid()) - blocks = blocks[1:] +func TestContextSession(t *testing.T) { + t.Parallel() + a := assert.New(t) - // Adding multiple blocks provide them. - a.NoError(blockservice.AddBlocks(ctx, blocks[0:2])) - added = append(added, blocks[0].Cid(), blocks[1].Cid()) - blocks = blocks[2:] + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - // Downloading one block provide it. - a.NoError(exchange.Put(ctx, blocks[0])) - _, err := blockservice.GetBlock(ctx, blocks[0].Cid()) - a.NoError(err) - added = append(added, blocks[0].Cid()) - blocks = blocks[1:] - - // Downloading multiple blocks provide them. - a.NoError(exchange.PutMany(ctx, blocks[0:2])) - cids := []cid.Cid{blocks[0].Cid(), blocks[1].Cid()} - var got []cid.Cid - for b := range blockservice.GetBlocks(ctx, cids) { - got = append(got, b.Cid()) - } - added = append(added, cids...) - a.ElementsMatch(cids, got) - blocks = blocks[2:] + blks := random.BlocksOfSize(2, blockSize) + block1 := blks[0] + block2 := blks[1] - session := NewSession(ctx, blockservice) + bs := blockstore.NewBlockstore(ds.NewMapDatastore()) + a.NoError(bs.Put(ctx, block1)) + a.NoError(bs.Put(ctx, block2)) + sesEx := &fakeIsNewSessionCreateExchange{ses: offline.Exchange(bs)} - // Downloading one block over a session provide it. - a.NoError(exchange.Put(ctx, blocks[0])) - _, err = session.GetBlock(ctx, blocks[0].Cid()) - a.NoError(err) - added = append(added, blocks[0].Cid()) - blocks = blocks[1:] - - // Downloading multiple blocks over a session provide them. - a.NoError(exchange.PutMany(ctx, blocks[0:2])) - cids = []cid.Cid{blocks[0].Cid(), blocks[1].Cid()} - got = nil - for b := range session.GetBlocks(ctx, cids) { - got = append(got, b.Cid()) - } - a.ElementsMatch(cids, got) - added = append(added, cids...) - blocks = blocks[2:] + service := New(blockstore.NewBlockstore(ds.NewMapDatastore()), sesEx) - // Test wrapping the blockservice like nopfs does. - session = NewSession(ctx, wrappedBlockservice{blockservice}) + ctx = ContextWithSession(ctx, service) - // Downloading one block over a wrapped blockservice session provide it. - a.NoError(exchange.Put(ctx, blocks[0])) - _, err = session.GetBlock(ctx, blocks[0].Cid()) + b, err := service.GetBlock(ctx, block1.Cid()) a.NoError(err) - added = append(added, blocks[0].Cid()) - blocks = blocks[1:] - - // Downloading multiple blocks over a wrapped blockservice session provide them. - a.NoError(exchange.PutMany(ctx, blocks[0:2])) - cids = []cid.Cid{blocks[0].Cid(), blocks[1].Cid()} - got = nil - for b := range session.GetBlocks(ctx, cids) { - got = append(got, b.Cid()) - } - a.ElementsMatch(cids, got) - added = append(added, cids...) - blocks = blocks[2:] - - a.Empty(blocks) - - a.ElementsMatch(added, []cid.Cid(prov)) + a.Equal(b.RawData(), block1.RawData()) + a.True(sesEx.newSessionWasCalled, "new session from context should be created") + sesEx.newSessionWasCalled = false + + bchan := service.GetBlocks(ctx, []cid.Cid{block2.Cid()}) + a.Equal((<-bchan).RawData(), block2.RawData()) + a.False(sesEx.newSessionWasCalled, "session should be reused in context") + + a.Equal( + NewSession(ctx, service), + NewSession(ContextWithSession(ctx, service), service), + "session must be deduped in all invocations on the same context", + ) } diff --git a/blockservice/test/mock.go b/blockservice/test/mock.go index be30658f8e..77eeed127a 100644 --- a/blockservice/test/mock.go +++ b/blockservice/test/mock.go @@ -18,7 +18,7 @@ func Mocks(n int, opts ...blockservice.Option) []blockservice.BlockService { var servs []blockservice.BlockService for _, i := range instances { servs = append(servs, blockservice.New(i.Blockstore, - i.Exchange, append(opts, blockservice.WithProvider(i.Routing))...)) + i.Exchange, opts...)) } return servs }