diff --git a/.mockery.yaml b/.mockery.yaml index 60434ff4dd..2a068cfc86 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -9,8 +9,13 @@ packages: github.com/e2b-dev/infra/packages/shared/pkg/storage: interfaces: - StorageObjectProvider: + ObjectProvider: config: dir: packages/shared/pkg/storage/mocks - filename: mocks.go + filename: mockobjectprovider.go + pkgname: storagemocks + SeekableObjectProvider: + config: + dir: packages/shared/pkg/storage/mocks + filename: mockseekableobjectprovider.go pkgname: storagemocks diff --git a/packages/orchestrator/cmd/inspect-data/main.go b/packages/orchestrator/cmd/inspect-data/main.go index 237dcdf352..a0650f39d7 100644 --- a/packages/orchestrator/cmd/inspect-data/main.go +++ b/packages/orchestrator/cmd/inspect-data/main.go @@ -24,14 +24,17 @@ func main() { var storagePath string var blockSize int64 + var objectType storage.SeekableObjectType switch *kind { case "memfile": storagePath = template.StorageMemfilePath() blockSize = 2097152 + objectType = storage.MemfileObjectType case "rootfs": storagePath = template.StorageRootfsPath() blockSize = 4096 + objectType = storage.RootFSObjectType default: log.Fatalf("invalid kind: %s", *kind) } @@ -47,7 +50,7 @@ func main() { log.Fatalf("failed to get storage provider: %s", err) } - obj, err := storage.OpenObject(ctx, storagePath) + obj, err := storage.OpenSeekableObject(ctx, storagePath, objectType) if err != nil { log.Fatalf("failed to open object: %s", err) } diff --git a/packages/orchestrator/cmd/inspect-header/main.go b/packages/orchestrator/cmd/inspect-header/main.go index 0dcab576b1..32e364dc70 100644 --- a/packages/orchestrator/cmd/inspect-header/main.go +++ b/packages/orchestrator/cmd/inspect-header/main.go @@ -22,14 +22,19 @@ func main() { } var storagePath string + var objectType storage.ObjectType switch *kind { case "memfile": storagePath = template.StorageMemfileHeaderPath() + objectType = storage.MemfileHeaderObjectType case "rootfs": storagePath = template.StorageRootfsHeaderPath() + objectType = storage.RootFSHeaderObjectType default: log.Fatalf("invalid kind: %s", *kind) + + return } ctx := context.Background() @@ -38,7 +43,7 @@ func main() { log.Fatalf("failed to get storage provider: %s", err) } - obj, err := storage.OpenObject(ctx, storagePath) + obj, err := storage.OpenObject(ctx, storagePath, objectType) if err != nil { log.Fatalf("failed to open object: %s", err) } diff --git a/packages/orchestrator/cmd/simulate-headers-merge/main.go b/packages/orchestrator/cmd/simulate-headers-merge/main.go index 963c5f1906..931a6424f3 100644 --- a/packages/orchestrator/cmd/simulate-headers-merge/main.go +++ b/packages/orchestrator/cmd/simulate-headers-merge/main.go @@ -31,14 +31,17 @@ func main() { var baseStoragePath string var diffStoragePath string + var objectType storage.ObjectType switch *kind { case "memfile": baseStoragePath = baseTemplate.StorageMemfileHeaderPath() diffStoragePath = diffTemplate.StorageMemfileHeaderPath() + objectType = storage.MemfileHeaderObjectType case "rootfs": baseStoragePath = baseTemplate.StorageRootfsHeaderPath() diffStoragePath = diffTemplate.StorageRootfsHeaderPath() + objectType = storage.RootFSHeaderObjectType default: log.Fatalf("invalid kind: %s", *kind) } @@ -50,12 +53,12 @@ func main() { log.Fatalf("failed to get storage provider: %s", err) } - baseObj, err := storageProvider.OpenObject(ctx, baseStoragePath) + baseObj, err := storageProvider.OpenObject(ctx, baseStoragePath, objectType) if err != nil { log.Fatalf("failed to open object: %s", err) } - diffObj, err := storageProvider.OpenObject(ctx, diffStoragePath) + diffObj, err := storageProvider.OpenObject(ctx, diffStoragePath, objectType) if err != nil { log.Fatalf("failed to open object: %s", err) } diff --git a/packages/orchestrator/internal/sandbox/build/build.go b/packages/orchestrator/internal/sandbox/build/build.go index 386577f848..b4695434e4 100644 --- a/packages/orchestrator/internal/sandbox/build/build.go +++ b/packages/orchestrator/internal/sandbox/build/build.go @@ -116,7 +116,7 @@ func (b *File) Slice(ctx context.Context, off, _ int64) ([]byte, error) { } func (b *File) getBuild(ctx context.Context, buildID *uuid.UUID) (Diff, error) { - storageDiff := newStorageDiff( + storageDiff, err := newStorageDiff( b.store.cachePath, buildID.String(), b.fileType, @@ -124,6 +124,9 @@ func (b *File) getBuild(ctx context.Context, buildID *uuid.UUID) (Diff, error) { b.metrics, b.persistence, ) + if err != nil { + return nil, fmt.Errorf("failed to create storage diff: %w", err) + } source, err := b.store.Get(ctx, storageDiff) if err != nil { diff --git a/packages/orchestrator/internal/sandbox/build/storage_diff.go b/packages/orchestrator/internal/sandbox/build/storage_diff.go index 34af7adafc..5b6fe0e7a0 100644 --- a/packages/orchestrator/internal/sandbox/build/storage_diff.go +++ b/packages/orchestrator/internal/sandbox/build/storage_diff.go @@ -18,10 +18,12 @@ func storagePath(buildId string, diffType DiffType) string { } type StorageDiff struct { - chunker *utils.SetOnce[*block.Chunker] - cachePath string - cacheKey DiffStoreKey - storagePath string + chunker *utils.SetOnce[*block.Chunker] + cachePath string + cacheKey DiffStoreKey + storagePath string + storageObjectType storage.SeekableObjectType + blockSize int64 metrics blockmetrics.Metrics persistence storage.StorageProvider @@ -29,6 +31,14 @@ type StorageDiff struct { var _ Diff = (*StorageDiff)(nil) +type UnknownDiffTypeError struct { + DiffType DiffType +} + +func (e UnknownDiffTypeError) Error() string { + return fmt.Sprintf("unknown diff type: %s", e.DiffType) +} + func newStorageDiff( basePath string, buildId string, @@ -36,21 +46,37 @@ func newStorageDiff( blockSize int64, metrics blockmetrics.Metrics, persistence storage.StorageProvider, -) *StorageDiff { +) (*StorageDiff, error) { cachePathSuffix := id.Generate() storagePath := storagePath(buildId, diffType) + storageObjectType, ok := storageObjectType(diffType) + if !ok { + return nil, UnknownDiffTypeError{diffType} + } cacheFile := fmt.Sprintf("%s-%s-%s", buildId, diffType, cachePathSuffix) cachePath := filepath.Join(basePath, cacheFile) return &StorageDiff{ - storagePath: storagePath, - cachePath: cachePath, - chunker: utils.NewSetOnce[*block.Chunker](), - blockSize: blockSize, - metrics: metrics, - persistence: persistence, - cacheKey: GetDiffStoreKey(buildId, diffType), + storagePath: storagePath, + storageObjectType: storageObjectType, + cachePath: cachePath, + chunker: utils.NewSetOnce[*block.Chunker](), + blockSize: blockSize, + metrics: metrics, + persistence: persistence, + cacheKey: GetDiffStoreKey(buildId, diffType), + }, nil +} + +func storageObjectType(diffType DiffType) (storage.SeekableObjectType, bool) { + switch diffType { + case Memfile: + return storage.MemfileObjectType, true + case Rootfs: + return storage.RootFSObjectType, true + default: + return storage.UnknownSeekableObjectType, false } } @@ -59,7 +85,7 @@ func (b *StorageDiff) CacheKey() DiffStoreKey { } func (b *StorageDiff) Init(ctx context.Context) error { - obj, err := b.persistence.OpenObject(ctx, b.storagePath) + obj, err := b.persistence.OpenSeekableObject(ctx, b.storagePath, b.storageObjectType) if err != nil { return err } diff --git a/packages/orchestrator/internal/sandbox/template/storage.go b/packages/orchestrator/internal/sandbox/template/storage.go index 3bec8d218d..7ecb193cd2 100644 --- a/packages/orchestrator/internal/sandbox/template/storage.go +++ b/packages/orchestrator/internal/sandbox/template/storage.go @@ -23,6 +23,28 @@ type Storage struct { source *build.File } +func storageHeaderObjectType(diffType build.DiffType) (storage.ObjectType, bool) { + switch diffType { + case build.Memfile: + return storage.MemfileHeaderObjectType, true + case build.Rootfs: + return storage.RootFSHeaderObjectType, true + default: + return storage.UnknownObjectType, false + } +} + +func objectType(diffType build.DiffType) (storage.SeekableObjectType, bool) { + switch diffType { + case build.Memfile: + return storage.MemfileObjectType, true + case build.Rootfs: + return storage.RootFSObjectType, true + default: + return storage.UnknownSeekableObjectType, false + } +} + func NewStorage( ctx context.Context, store *build.DiffStore, @@ -34,7 +56,12 @@ func NewStorage( ) (*Storage, error) { if h == nil { headerObjectPath := buildId + "/" + string(fileType) + storage.HeaderSuffix - headerObject, err := persistence.OpenObject(ctx, headerObjectPath) + headerObjectType, ok := storageHeaderObjectType(fileType) + if !ok { + return nil, build.UnknownDiffTypeError{DiffType: fileType} + } + + headerObject, err := persistence.OpenObject(ctx, headerObjectPath, headerObjectType) if err != nil { return nil, err } @@ -54,7 +81,11 @@ func NewStorage( // If we can't find the diff header in storage, we try to find the "old" style template without a header as a fallback. if h == nil { objectPath := buildId + "/" + string(fileType) - object, err := persistence.OpenObject(ctx, objectPath) + objectType, ok := objectType(fileType) + if !ok { + return nil, build.UnknownDiffTypeError{DiffType: fileType} + } + object, err := persistence.OpenSeekableObject(ctx, objectPath, objectType) if err != nil { return nil, err } diff --git a/packages/orchestrator/internal/sandbox/template/storage_file.go b/packages/orchestrator/internal/sandbox/template/storage_file.go index 96910698b3..c01fe78d30 100644 --- a/packages/orchestrator/internal/sandbox/template/storage_file.go +++ b/packages/orchestrator/internal/sandbox/template/storage_file.go @@ -18,6 +18,7 @@ func newStorageFile( persistence storage.StorageProvider, objectPath string, path string, + objectType storage.ObjectType, ) (*storageFile, error) { f, err := os.Create(path) if err != nil { @@ -26,7 +27,7 @@ func newStorageFile( defer f.Close() - object, err := persistence.OpenObject(ctx, objectPath) + object, err := persistence.OpenObject(ctx, objectPath, objectType) if err != nil { return nil, err } diff --git a/packages/orchestrator/internal/sandbox/template/storage_template.go b/packages/orchestrator/internal/sandbox/template/storage_template.go index f19df74c36..3f21450012 100644 --- a/packages/orchestrator/internal/sandbox/template/storage_template.go +++ b/packages/orchestrator/internal/sandbox/template/storage_template.go @@ -87,6 +87,7 @@ func (t *storageTemplate) Fetch(ctx context.Context, buildStore *build.DiffStore t.persistence, t.files.StorageSnapfilePath(), t.files.CacheSnapfilePath(), + storage.SnapfileObjectType, ) if snapfileErr != nil { errMsg := fmt.Errorf("failed to fetch snapfile: %w", snapfileErr) @@ -119,6 +120,7 @@ func (t *storageTemplate) Fetch(ctx context.Context, buildStore *build.DiffStore t.persistence, t.files.StorageMetadataPath(), t.files.CacheMetadataPath(), + storage.MetadataObjectType, ) if err != nil && !errors.Is(err, storage.ErrObjectNotExist) { sourceErr := fmt.Errorf("failed to fetch metafile: %w", err) diff --git a/packages/orchestrator/internal/sandbox/template_build.go b/packages/orchestrator/internal/sandbox/template_build.go index dc00a4f80a..b232b61f85 100644 --- a/packages/orchestrator/internal/sandbox/template_build.go +++ b/packages/orchestrator/internal/sandbox/template_build.go @@ -38,7 +38,7 @@ func (t *TemplateBuild) Remove(ctx context.Context) error { } func (t *TemplateBuild) uploadMemfileHeader(ctx context.Context, h *headers.Header) error { - object, err := t.persistence.OpenObject(ctx, t.files.StorageMemfileHeaderPath()) + object, err := t.persistence.OpenObject(ctx, t.files.StorageMemfileHeaderPath(), storage.MemfileHeaderObjectType) if err != nil { return err } @@ -57,7 +57,7 @@ func (t *TemplateBuild) uploadMemfileHeader(ctx context.Context, h *headers.Head } func (t *TemplateBuild) uploadMemfile(ctx context.Context, memfilePath string) error { - object, err := t.persistence.OpenObject(ctx, t.files.StorageMemfilePath()) + object, err := t.persistence.OpenSeekableObject(ctx, t.files.StorageMemfilePath(), storage.MemfileObjectType) if err != nil { return err } @@ -71,7 +71,7 @@ func (t *TemplateBuild) uploadMemfile(ctx context.Context, memfilePath string) e } func (t *TemplateBuild) uploadRootfsHeader(ctx context.Context, h *headers.Header) error { - object, err := t.persistence.OpenObject(ctx, t.files.StorageRootfsHeaderPath()) + object, err := t.persistence.OpenObject(ctx, t.files.StorageRootfsHeaderPath(), storage.RootFSHeaderObjectType) if err != nil { return err } @@ -90,7 +90,7 @@ func (t *TemplateBuild) uploadRootfsHeader(ctx context.Context, h *headers.Heade } func (t *TemplateBuild) uploadRootfs(ctx context.Context, rootfsPath string) error { - object, err := t.persistence.OpenObject(ctx, t.files.StorageRootfsPath()) + object, err := t.persistence.OpenSeekableObject(ctx, t.files.StorageRootfsPath(), storage.RootFSObjectType) if err != nil { return err } @@ -105,7 +105,7 @@ func (t *TemplateBuild) uploadRootfs(ctx context.Context, rootfsPath string) err // Snap-file is small enough so we don't use composite upload. func (t *TemplateBuild) uploadSnapfile(ctx context.Context, path string) error { - object, err := t.persistence.OpenObject(ctx, t.files.StorageSnapfilePath()) + object, err := t.persistence.OpenObject(ctx, t.files.StorageSnapfilePath(), storage.SnapfileObjectType) if err != nil { return err } @@ -119,7 +119,7 @@ func (t *TemplateBuild) uploadSnapfile(ctx context.Context, path string) error { // Metadata is small enough so we don't use composite upload. func (t *TemplateBuild) uploadMetadata(ctx context.Context, path string) error { - object, err := t.persistence.OpenObject(ctx, t.files.StorageMetadataPath()) + object, err := t.persistence.OpenObject(ctx, t.files.StorageMetadataPath(), storage.MetadataObjectType) if err != nil { return err } diff --git a/packages/orchestrator/internal/template/build/builder.go b/packages/orchestrator/internal/template/build/builder.go index 9c14c92415..b28eadc8e2 100644 --- a/packages/orchestrator/internal/template/build/builder.go +++ b/packages/orchestrator/internal/template/build/builder.go @@ -337,7 +337,7 @@ func getRootfsSize( s storage.StorageProvider, metadata storage.TemplateFiles, ) (uint64, error) { - obj, err := s.OpenObject(ctx, metadata.StorageRootfsHeaderPath()) + obj, err := s.OpenObject(ctx, metadata.StorageRootfsHeaderPath(), storage.RootFSHeaderObjectType) if err != nil { return 0, fmt.Errorf("error opening rootfs header object: %w", err) } diff --git a/packages/orchestrator/internal/template/build/commands/copy.go b/packages/orchestrator/internal/template/build/commands/copy.go index 7448426934..d402018744 100644 --- a/packages/orchestrator/internal/template/build/commands/copy.go +++ b/packages/orchestrator/internal/template/build/commands/copy.go @@ -80,7 +80,7 @@ func (c *Copy) Execute( } // 1) Download the layer tar file from the storage to the local filesystem - obj, err := c.FilesStorage.OpenObject(ctx, paths.GetLayerFilesCachePath(c.CacheScope, step.GetFilesHash())) + obj, err := c.FilesStorage.OpenObject(ctx, paths.GetLayerFilesCachePath(c.CacheScope, step.GetFilesHash()), storage.BuildLayerFileObjectType) if err != nil { return metadata.Context{}, fmt.Errorf("failed to open files object from storage: %w", err) } diff --git a/packages/orchestrator/internal/template/build/storage/cache/cache.go b/packages/orchestrator/internal/template/build/storage/cache/cache.go index de17dd7ee9..44d69e062a 100644 --- a/packages/orchestrator/internal/template/build/storage/cache/cache.go +++ b/packages/orchestrator/internal/template/build/storage/cache/cache.go @@ -63,7 +63,7 @@ func (h *HashIndex) LayerMetaFromHash(ctx context.Context, hash string) (LayerMe ctx, span := tracer.Start(ctx, "get layer_metadata") defer span.End() - obj, err := h.indexStorage.OpenObject(ctx, paths.HashToPath(h.cacheScope, hash)) + obj, err := h.indexStorage.OpenObject(ctx, paths.HashToPath(h.cacheScope, hash), storage.LayerMetadataObjectType) if err != nil { return LayerMetadata{}, fmt.Errorf("error opening object for layer metadata: %w", err) } @@ -91,7 +91,7 @@ func (h *HashIndex) SaveLayerMeta(ctx context.Context, hash string, template Lay ctx, span := tracer.Start(ctx, "save layer_metadata") defer span.End() - obj, err := h.indexStorage.OpenObject(ctx, paths.HashToPath(h.cacheScope, hash)) + obj, err := h.indexStorage.OpenObject(ctx, paths.HashToPath(h.cacheScope, hash), storage.LayerMetadataObjectType) if err != nil { return fmt.Errorf("error creating object for saving UUID: %w", err) } diff --git a/packages/orchestrator/internal/template/metadata/template_metadata.go b/packages/orchestrator/internal/template/metadata/template_metadata.go index d3c2bc0b69..6c3a924a9d 100644 --- a/packages/orchestrator/internal/template/metadata/template_metadata.go +++ b/packages/orchestrator/internal/template/metadata/template_metadata.go @@ -138,7 +138,7 @@ func fromTemplate(ctx context.Context, s storage.StorageProvider, files storage. ctx, span := tracer.Start(ctx, "from template") defer span.End() - obj, err := s.OpenObject(ctx, files.StorageMetadataPath()) + obj, err := s.OpenObject(ctx, files.StorageMetadataPath(), storage.MetadataObjectType) if err != nil { return Template{}, fmt.Errorf("error opening object for template metadata: %w", err) } diff --git a/packages/orchestrator/internal/template/server/upload_layer_files_template.go b/packages/orchestrator/internal/template/server/upload_layer_files_template.go index 310802efb9..d07aabc0f8 100644 --- a/packages/orchestrator/internal/template/server/upload_layer_files_template.go +++ b/packages/orchestrator/internal/template/server/upload_layer_files_template.go @@ -7,12 +7,13 @@ import ( "github.com/e2b-dev/infra/packages/orchestrator/internal/template/build/storage/paths" templatemanager "github.com/e2b-dev/infra/packages/shared/pkg/grpc/template-manager" + "github.com/e2b-dev/infra/packages/shared/pkg/storage" ) const signedUrlExpiration = time.Minute * 30 func (s *ServerStore) InitLayerFileUpload(ctx context.Context, in *templatemanager.InitLayerFileUploadRequest) (*templatemanager.InitLayerFileUploadResponse, error) { - _, childSpan := tracer.Start(ctx, "template-create") + ctx, childSpan := tracer.Start(ctx, "template-create") defer childSpan.End() // default to scope by template ID @@ -22,7 +23,7 @@ func (s *ServerStore) InitLayerFileUpload(ctx context.Context, in *templatemanag } path := paths.GetLayerFilesCachePath(cacheScope, in.GetHash()) - obj, err := s.buildStorage.OpenObject(ctx, path) + obj, err := s.buildStorage.OpenObject(ctx, path, storage.BuildLayerFileObjectType) if err != nil { return nil, fmt.Errorf("failed to open layer files cache: %w", err) } @@ -32,16 +33,13 @@ func (s *ServerStore) InitLayerFileUpload(ctx context.Context, in *templatemanag return nil, fmt.Errorf("failed to get signed url: %w", err) } - _, err = obj.Size(ctx) + exists, err := obj.Exists(ctx) if err != nil { - return &templatemanager.InitLayerFileUploadResponse{ - Present: false, - Url: &signedUrl, - }, nil + return nil, fmt.Errorf("failed to check if layer files exists: %w", err) } return &templatemanager.InitLayerFileUploadResponse{ - Present: true, + Present: exists, Url: &signedUrl, }, nil } diff --git a/packages/shared/pkg/storage/mocks/mockobjectprovider.go b/packages/shared/pkg/storage/mocks/mockobjectprovider.go new file mode 100644 index 0000000000..0211dcb4f5 --- /dev/null +++ b/packages/shared/pkg/storage/mocks/mockobjectprovider.go @@ -0,0 +1,288 @@ +// Code generated by mockery; DO NOT EDIT. +// github.com/vektra/mockery +// template: testify + +package storagemocks + +import ( + "context" + "io" + + mock "github.com/stretchr/testify/mock" +) + +// NewMockObjectProvider creates a new instance of MockObjectProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockObjectProvider(t interface { + mock.TestingT + Cleanup(func()) +}) *MockObjectProvider { + mock := &MockObjectProvider{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} + +// MockObjectProvider is an autogenerated mock type for the ObjectProvider type +type MockObjectProvider struct { + mock.Mock +} + +type MockObjectProvider_Expecter struct { + mock *mock.Mock +} + +func (_m *MockObjectProvider) EXPECT() *MockObjectProvider_Expecter { + return &MockObjectProvider_Expecter{mock: &_m.Mock} +} + +// Exists provides a mock function for the type MockObjectProvider +func (_mock *MockObjectProvider) Exists(ctx context.Context) (bool, error) { + ret := _mock.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Exists") + } + + var r0 bool + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context) (bool, error)); ok { + return returnFunc(ctx) + } + if returnFunc, ok := ret.Get(0).(func(context.Context) bool); ok { + r0 = returnFunc(ctx) + } else { + r0 = ret.Get(0).(bool) + } + if returnFunc, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = returnFunc(ctx) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockObjectProvider_Exists_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Exists' +type MockObjectProvider_Exists_Call struct { + *mock.Call +} + +// Exists is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockObjectProvider_Expecter) Exists(ctx interface{}) *MockObjectProvider_Exists_Call { + return &MockObjectProvider_Exists_Call{Call: _e.mock.On("Exists", ctx)} +} + +func (_c *MockObjectProvider_Exists_Call) Run(run func(ctx context.Context)) *MockObjectProvider_Exists_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockObjectProvider_Exists_Call) Return(b bool, err error) *MockObjectProvider_Exists_Call { + _c.Call.Return(b, err) + return _c +} + +func (_c *MockObjectProvider_Exists_Call) RunAndReturn(run func(ctx context.Context) (bool, error)) *MockObjectProvider_Exists_Call { + _c.Call.Return(run) + return _c +} + +// Write provides a mock function for the type MockObjectProvider +func (_mock *MockObjectProvider) Write(ctx context.Context, p []byte) (int, error) { + ret := _mock.Called(ctx, p) + + if len(ret) == 0 { + panic("no return value specified for Write") + } + + var r0 int + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte) (int, error)); ok { + return returnFunc(ctx, p) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte) int); ok { + r0 = returnFunc(ctx, p) + } else { + r0 = ret.Get(0).(int) + } + if returnFunc, ok := ret.Get(1).(func(context.Context, []byte) error); ok { + r1 = returnFunc(ctx, p) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockObjectProvider_Write_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Write' +type MockObjectProvider_Write_Call struct { + *mock.Call +} + +// Write is a helper method to define mock.On call +// - ctx context.Context +// - p []byte +func (_e *MockObjectProvider_Expecter) Write(ctx interface{}, p interface{}) *MockObjectProvider_Write_Call { + return &MockObjectProvider_Write_Call{Call: _e.mock.On("Write", ctx, p)} +} + +func (_c *MockObjectProvider_Write_Call) Run(run func(ctx context.Context, p []byte)) *MockObjectProvider_Write_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 []byte + if args[1] != nil { + arg1 = args[1].([]byte) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockObjectProvider_Write_Call) Return(n int, err error) *MockObjectProvider_Write_Call { + _c.Call.Return(n, err) + return _c +} + +func (_c *MockObjectProvider_Write_Call) RunAndReturn(run func(ctx context.Context, p []byte) (int, error)) *MockObjectProvider_Write_Call { + _c.Call.Return(run) + return _c +} + +// WriteFromFileSystem provides a mock function for the type MockObjectProvider +func (_mock *MockObjectProvider) WriteFromFileSystem(ctx context.Context, path string) error { + ret := _mock.Called(ctx, path) + + if len(ret) == 0 { + panic("no return value specified for WriteFromFileSystem") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = returnFunc(ctx, path) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockObjectProvider_WriteFromFileSystem_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WriteFromFileSystem' +type MockObjectProvider_WriteFromFileSystem_Call struct { + *mock.Call +} + +// WriteFromFileSystem is a helper method to define mock.On call +// - ctx context.Context +// - path string +func (_e *MockObjectProvider_Expecter) WriteFromFileSystem(ctx interface{}, path interface{}) *MockObjectProvider_WriteFromFileSystem_Call { + return &MockObjectProvider_WriteFromFileSystem_Call{Call: _e.mock.On("WriteFromFileSystem", ctx, path)} +} + +func (_c *MockObjectProvider_WriteFromFileSystem_Call) Run(run func(ctx context.Context, path string)) *MockObjectProvider_WriteFromFileSystem_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockObjectProvider_WriteFromFileSystem_Call) Return(err error) *MockObjectProvider_WriteFromFileSystem_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockObjectProvider_WriteFromFileSystem_Call) RunAndReturn(run func(ctx context.Context, path string) error) *MockObjectProvider_WriteFromFileSystem_Call { + _c.Call.Return(run) + return _c +} + +// WriteTo provides a mock function for the type MockObjectProvider +func (_mock *MockObjectProvider) WriteTo(ctx context.Context, w io.Writer) (int64, error) { + ret := _mock.Called(ctx, w) + + if len(ret) == 0 { + panic("no return value specified for WriteTo") + } + + var r0 int64 + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, io.Writer) (int64, error)); ok { + return returnFunc(ctx, w) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, io.Writer) int64); ok { + r0 = returnFunc(ctx, w) + } else { + r0 = ret.Get(0).(int64) + } + if returnFunc, ok := ret.Get(1).(func(context.Context, io.Writer) error); ok { + r1 = returnFunc(ctx, w) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockObjectProvider_WriteTo_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WriteTo' +type MockObjectProvider_WriteTo_Call struct { + *mock.Call +} + +// WriteTo is a helper method to define mock.On call +// - ctx context.Context +// - w io.Writer +func (_e *MockObjectProvider_Expecter) WriteTo(ctx interface{}, w interface{}) *MockObjectProvider_WriteTo_Call { + return &MockObjectProvider_WriteTo_Call{Call: _e.mock.On("WriteTo", ctx, w)} +} + +func (_c *MockObjectProvider_WriteTo_Call) Run(run func(ctx context.Context, w io.Writer)) *MockObjectProvider_WriteTo_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 io.Writer + if args[1] != nil { + arg1 = args[1].(io.Writer) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockObjectProvider_WriteTo_Call) Return(n int64, err error) *MockObjectProvider_WriteTo_Call { + _c.Call.Return(n, err) + return _c +} + +func (_c *MockObjectProvider_WriteTo_Call) RunAndReturn(run func(ctx context.Context, w io.Writer) (int64, error)) *MockObjectProvider_WriteTo_Call { + _c.Call.Return(run) + return _c +} diff --git a/packages/shared/pkg/storage/mocks/mocks.go b/packages/shared/pkg/storage/mocks/mocks.go deleted file mode 100644 index 0a14e10465..0000000000 --- a/packages/shared/pkg/storage/mocks/mocks.go +++ /dev/null @@ -1,411 +0,0 @@ -// Code generated by mockery; DO NOT EDIT. -// github.com/vektra/mockery -// template: testify - -package storagemocks - -import ( - "context" - "io" - - mock "github.com/stretchr/testify/mock" -) - -// NewMockStorageObjectProvider creates a new instance of MockStorageObjectProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockStorageObjectProvider(t interface { - mock.TestingT - Cleanup(func()) -}) *MockStorageObjectProvider { - mock := &MockStorageObjectProvider{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} - -// MockStorageObjectProvider is an autogenerated mock type for the StorageObjectProvider type -type MockStorageObjectProvider struct { - mock.Mock -} - -type MockStorageObjectProvider_Expecter struct { - mock *mock.Mock -} - -func (_m *MockStorageObjectProvider) EXPECT() *MockStorageObjectProvider_Expecter { - return &MockStorageObjectProvider_Expecter{mock: &_m.Mock} -} - -// Delete provides a mock function for the type MockStorageObjectProvider -func (_mock *MockStorageObjectProvider) Delete(ctx context.Context) error { - ret := _mock.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for Delete") - } - - var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context) error); ok { - r0 = returnFunc(ctx) - } else { - r0 = ret.Error(0) - } - return r0 -} - -// MockStorageObjectProvider_Delete_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Delete' -type MockStorageObjectProvider_Delete_Call struct { - *mock.Call -} - -// Delete is a helper method to define mock.On call -// - ctx context.Context -func (_e *MockStorageObjectProvider_Expecter) Delete(ctx interface{}) *MockStorageObjectProvider_Delete_Call { - return &MockStorageObjectProvider_Delete_Call{Call: _e.mock.On("Delete", ctx)} -} - -func (_c *MockStorageObjectProvider_Delete_Call) Run(run func(ctx context.Context)) *MockStorageObjectProvider_Delete_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - run( - arg0, - ) - }) - return _c -} - -func (_c *MockStorageObjectProvider_Delete_Call) Return(err error) *MockStorageObjectProvider_Delete_Call { - _c.Call.Return(err) - return _c -} - -func (_c *MockStorageObjectProvider_Delete_Call) RunAndReturn(run func(ctx context.Context) error) *MockStorageObjectProvider_Delete_Call { - _c.Call.Return(run) - return _c -} - -// ReadAt provides a mock function for the type MockStorageObjectProvider -func (_mock *MockStorageObjectProvider) ReadAt(ctx context.Context, p []byte, off int64) (int, error) { - ret := _mock.Called(ctx, p, off) - - if len(ret) == 0 { - panic("no return value specified for ReadAt") - } - - var r0 int - var r1 error - if returnFunc, ok := ret.Get(0).(func(context.Context, []byte, int64) (int, error)); ok { - return returnFunc(ctx, p, off) - } - if returnFunc, ok := ret.Get(0).(func(context.Context, []byte, int64) int); ok { - r0 = returnFunc(ctx, p, off) - } else { - r0 = ret.Get(0).(int) - } - if returnFunc, ok := ret.Get(1).(func(context.Context, []byte, int64) error); ok { - r1 = returnFunc(ctx, p, off) - } else { - r1 = ret.Error(1) - } - return r0, r1 -} - -// MockStorageObjectProvider_ReadAt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReadAt' -type MockStorageObjectProvider_ReadAt_Call struct { - *mock.Call -} - -// ReadAt is a helper method to define mock.On call -// - ctx context.Context -// - p []byte -// - off int64 -func (_e *MockStorageObjectProvider_Expecter) ReadAt(ctx interface{}, p interface{}, off interface{}) *MockStorageObjectProvider_ReadAt_Call { - return &MockStorageObjectProvider_ReadAt_Call{Call: _e.mock.On("ReadAt", ctx, p, off)} -} - -func (_c *MockStorageObjectProvider_ReadAt_Call) Run(run func(ctx context.Context, p []byte, off int64)) *MockStorageObjectProvider_ReadAt_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - var arg1 []byte - if args[1] != nil { - arg1 = args[1].([]byte) - } - var arg2 int64 - if args[2] != nil { - arg2 = args[2].(int64) - } - run( - arg0, - arg1, - arg2, - ) - }) - return _c -} - -func (_c *MockStorageObjectProvider_ReadAt_Call) Return(n int, err error) *MockStorageObjectProvider_ReadAt_Call { - _c.Call.Return(n, err) - return _c -} - -func (_c *MockStorageObjectProvider_ReadAt_Call) RunAndReturn(run func(ctx context.Context, p []byte, off int64) (int, error)) *MockStorageObjectProvider_ReadAt_Call { - _c.Call.Return(run) - return _c -} - -// Size provides a mock function for the type MockStorageObjectProvider -func (_mock *MockStorageObjectProvider) Size(ctx context.Context) (int64, error) { - ret := _mock.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for Size") - } - - var r0 int64 - var r1 error - if returnFunc, ok := ret.Get(0).(func(context.Context) (int64, error)); ok { - return returnFunc(ctx) - } - if returnFunc, ok := ret.Get(0).(func(context.Context) int64); ok { - r0 = returnFunc(ctx) - } else { - r0 = ret.Get(0).(int64) - } - if returnFunc, ok := ret.Get(1).(func(context.Context) error); ok { - r1 = returnFunc(ctx) - } else { - r1 = ret.Error(1) - } - return r0, r1 -} - -// MockStorageObjectProvider_Size_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Size' -type MockStorageObjectProvider_Size_Call struct { - *mock.Call -} - -// Size is a helper method to define mock.On call -// - ctx context.Context -func (_e *MockStorageObjectProvider_Expecter) Size(ctx interface{}) *MockStorageObjectProvider_Size_Call { - return &MockStorageObjectProvider_Size_Call{Call: _e.mock.On("Size", ctx)} -} - -func (_c *MockStorageObjectProvider_Size_Call) Run(run func(ctx context.Context)) *MockStorageObjectProvider_Size_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - run( - arg0, - ) - }) - return _c -} - -func (_c *MockStorageObjectProvider_Size_Call) Return(n int64, err error) *MockStorageObjectProvider_Size_Call { - _c.Call.Return(n, err) - return _c -} - -func (_c *MockStorageObjectProvider_Size_Call) RunAndReturn(run func(ctx context.Context) (int64, error)) *MockStorageObjectProvider_Size_Call { - _c.Call.Return(run) - return _c -} - -// Write provides a mock function for the type MockStorageObjectProvider -func (_mock *MockStorageObjectProvider) Write(ctx context.Context, p []byte) (int, error) { - ret := _mock.Called(ctx, p) - - if len(ret) == 0 { - panic("no return value specified for Write") - } - - var r0 int - var r1 error - if returnFunc, ok := ret.Get(0).(func(context.Context, []byte) (int, error)); ok { - return returnFunc(ctx, p) - } - if returnFunc, ok := ret.Get(0).(func(context.Context, []byte) int); ok { - r0 = returnFunc(ctx, p) - } else { - r0 = ret.Get(0).(int) - } - if returnFunc, ok := ret.Get(1).(func(context.Context, []byte) error); ok { - r1 = returnFunc(ctx, p) - } else { - r1 = ret.Error(1) - } - return r0, r1 -} - -// MockStorageObjectProvider_Write_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Write' -type MockStorageObjectProvider_Write_Call struct { - *mock.Call -} - -// Write is a helper method to define mock.On call -// - ctx context.Context -// - p []byte -func (_e *MockStorageObjectProvider_Expecter) Write(ctx interface{}, p interface{}) *MockStorageObjectProvider_Write_Call { - return &MockStorageObjectProvider_Write_Call{Call: _e.mock.On("Write", ctx, p)} -} - -func (_c *MockStorageObjectProvider_Write_Call) Run(run func(ctx context.Context, p []byte)) *MockStorageObjectProvider_Write_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - var arg1 []byte - if args[1] != nil { - arg1 = args[1].([]byte) - } - run( - arg0, - arg1, - ) - }) - return _c -} - -func (_c *MockStorageObjectProvider_Write_Call) Return(n int, err error) *MockStorageObjectProvider_Write_Call { - _c.Call.Return(n, err) - return _c -} - -func (_c *MockStorageObjectProvider_Write_Call) RunAndReturn(run func(ctx context.Context, p []byte) (int, error)) *MockStorageObjectProvider_Write_Call { - _c.Call.Return(run) - return _c -} - -// WriteFromFileSystem provides a mock function for the type MockStorageObjectProvider -func (_mock *MockStorageObjectProvider) WriteFromFileSystem(ctx context.Context, path string) error { - ret := _mock.Called(ctx, path) - - if len(ret) == 0 { - panic("no return value specified for WriteFromFileSystem") - } - - var r0 error - if returnFunc, ok := ret.Get(0).(func(context.Context, string) error); ok { - r0 = returnFunc(ctx, path) - } else { - r0 = ret.Error(0) - } - return r0 -} - -// MockStorageObjectProvider_WriteFromFileSystem_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WriteFromFileSystem' -type MockStorageObjectProvider_WriteFromFileSystem_Call struct { - *mock.Call -} - -// WriteFromFileSystem is a helper method to define mock.On call -// - ctx context.Context -// - path string -func (_e *MockStorageObjectProvider_Expecter) WriteFromFileSystem(ctx interface{}, path interface{}) *MockStorageObjectProvider_WriteFromFileSystem_Call { - return &MockStorageObjectProvider_WriteFromFileSystem_Call{Call: _e.mock.On("WriteFromFileSystem", ctx, path)} -} - -func (_c *MockStorageObjectProvider_WriteFromFileSystem_Call) Run(run func(ctx context.Context, path string)) *MockStorageObjectProvider_WriteFromFileSystem_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - var arg1 string - if args[1] != nil { - arg1 = args[1].(string) - } - run( - arg0, - arg1, - ) - }) - return _c -} - -func (_c *MockStorageObjectProvider_WriteFromFileSystem_Call) Return(err error) *MockStorageObjectProvider_WriteFromFileSystem_Call { - _c.Call.Return(err) - return _c -} - -func (_c *MockStorageObjectProvider_WriteFromFileSystem_Call) RunAndReturn(run func(ctx context.Context, path string) error) *MockStorageObjectProvider_WriteFromFileSystem_Call { - _c.Call.Return(run) - return _c -} - -// WriteTo provides a mock function for the type MockStorageObjectProvider -func (_mock *MockStorageObjectProvider) WriteTo(ctx context.Context, w io.Writer) (int64, error) { - ret := _mock.Called(ctx, w) - - if len(ret) == 0 { - panic("no return value specified for WriteTo") - } - - var r0 int64 - var r1 error - if returnFunc, ok := ret.Get(0).(func(context.Context, io.Writer) (int64, error)); ok { - return returnFunc(ctx, w) - } - if returnFunc, ok := ret.Get(0).(func(context.Context, io.Writer) int64); ok { - r0 = returnFunc(ctx, w) - } else { - r0 = ret.Get(0).(int64) - } - if returnFunc, ok := ret.Get(1).(func(context.Context, io.Writer) error); ok { - r1 = returnFunc(ctx, w) - } else { - r1 = ret.Error(1) - } - return r0, r1 -} - -// MockStorageObjectProvider_WriteTo_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WriteTo' -type MockStorageObjectProvider_WriteTo_Call struct { - *mock.Call -} - -// WriteTo is a helper method to define mock.On call -// - ctx context.Context -// - w io.Writer -func (_e *MockStorageObjectProvider_Expecter) WriteTo(ctx interface{}, w interface{}) *MockStorageObjectProvider_WriteTo_Call { - return &MockStorageObjectProvider_WriteTo_Call{Call: _e.mock.On("WriteTo", ctx, w)} -} - -func (_c *MockStorageObjectProvider_WriteTo_Call) Run(run func(ctx context.Context, w io.Writer)) *MockStorageObjectProvider_WriteTo_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - var arg1 io.Writer - if args[1] != nil { - arg1 = args[1].(io.Writer) - } - run( - arg0, - arg1, - ) - }) - return _c -} - -func (_c *MockStorageObjectProvider_WriteTo_Call) Return(n int64, err error) *MockStorageObjectProvider_WriteTo_Call { - _c.Call.Return(n, err) - return _c -} - -func (_c *MockStorageObjectProvider_WriteTo_Call) RunAndReturn(run func(ctx context.Context, w io.Writer) (int64, error)) *MockStorageObjectProvider_WriteTo_Call { - _c.Call.Return(run) - return _c -} diff --git a/packages/shared/pkg/storage/mocks/mockseekableobjectprovider.go b/packages/shared/pkg/storage/mocks/mockseekableobjectprovider.go new file mode 100644 index 0000000000..447bee242b --- /dev/null +++ b/packages/shared/pkg/storage/mocks/mockseekableobjectprovider.go @@ -0,0 +1,227 @@ +// Code generated by mockery; DO NOT EDIT. +// github.com/vektra/mockery +// template: testify + +package storagemocks + +import ( + "context" + + mock "github.com/stretchr/testify/mock" +) + +// NewMockSeekableObjectProvider creates a new instance of MockSeekableObjectProvider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockSeekableObjectProvider(t interface { + mock.TestingT + Cleanup(func()) +}) *MockSeekableObjectProvider { + mock := &MockSeekableObjectProvider{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} + +// MockSeekableObjectProvider is an autogenerated mock type for the SeekableObjectProvider type +type MockSeekableObjectProvider struct { + mock.Mock +} + +type MockSeekableObjectProvider_Expecter struct { + mock *mock.Mock +} + +func (_m *MockSeekableObjectProvider) EXPECT() *MockSeekableObjectProvider_Expecter { + return &MockSeekableObjectProvider_Expecter{mock: &_m.Mock} +} + +// ReadAt provides a mock function for the type MockSeekableObjectProvider +func (_mock *MockSeekableObjectProvider) ReadAt(ctx context.Context, p []byte, off int64) (int, error) { + ret := _mock.Called(ctx, p, off) + + if len(ret) == 0 { + panic("no return value specified for ReadAt") + } + + var r0 int + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte, int64) (int, error)); ok { + return returnFunc(ctx, p, off) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, []byte, int64) int); ok { + r0 = returnFunc(ctx, p, off) + } else { + r0 = ret.Get(0).(int) + } + if returnFunc, ok := ret.Get(1).(func(context.Context, []byte, int64) error); ok { + r1 = returnFunc(ctx, p, off) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockSeekableObjectProvider_ReadAt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ReadAt' +type MockSeekableObjectProvider_ReadAt_Call struct { + *mock.Call +} + +// ReadAt is a helper method to define mock.On call +// - ctx context.Context +// - p []byte +// - off int64 +func (_e *MockSeekableObjectProvider_Expecter) ReadAt(ctx interface{}, p interface{}, off interface{}) *MockSeekableObjectProvider_ReadAt_Call { + return &MockSeekableObjectProvider_ReadAt_Call{Call: _e.mock.On("ReadAt", ctx, p, off)} +} + +func (_c *MockSeekableObjectProvider_ReadAt_Call) Run(run func(ctx context.Context, p []byte, off int64)) *MockSeekableObjectProvider_ReadAt_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 []byte + if args[1] != nil { + arg1 = args[1].([]byte) + } + var arg2 int64 + if args[2] != nil { + arg2 = args[2].(int64) + } + run( + arg0, + arg1, + arg2, + ) + }) + return _c +} + +func (_c *MockSeekableObjectProvider_ReadAt_Call) Return(n int, err error) *MockSeekableObjectProvider_ReadAt_Call { + _c.Call.Return(n, err) + return _c +} + +func (_c *MockSeekableObjectProvider_ReadAt_Call) RunAndReturn(run func(ctx context.Context, p []byte, off int64) (int, error)) *MockSeekableObjectProvider_ReadAt_Call { + _c.Call.Return(run) + return _c +} + +// Size provides a mock function for the type MockSeekableObjectProvider +func (_mock *MockSeekableObjectProvider) Size(ctx context.Context) (int64, error) { + ret := _mock.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Size") + } + + var r0 int64 + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context) (int64, error)); ok { + return returnFunc(ctx) + } + if returnFunc, ok := ret.Get(0).(func(context.Context) int64); ok { + r0 = returnFunc(ctx) + } else { + r0 = ret.Get(0).(int64) + } + if returnFunc, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = returnFunc(ctx) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockSeekableObjectProvider_Size_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Size' +type MockSeekableObjectProvider_Size_Call struct { + *mock.Call +} + +// Size is a helper method to define mock.On call +// - ctx context.Context +func (_e *MockSeekableObjectProvider_Expecter) Size(ctx interface{}) *MockSeekableObjectProvider_Size_Call { + return &MockSeekableObjectProvider_Size_Call{Call: _e.mock.On("Size", ctx)} +} + +func (_c *MockSeekableObjectProvider_Size_Call) Run(run func(ctx context.Context)) *MockSeekableObjectProvider_Size_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + run( + arg0, + ) + }) + return _c +} + +func (_c *MockSeekableObjectProvider_Size_Call) Return(n int64, err error) *MockSeekableObjectProvider_Size_Call { + _c.Call.Return(n, err) + return _c +} + +func (_c *MockSeekableObjectProvider_Size_Call) RunAndReturn(run func(ctx context.Context) (int64, error)) *MockSeekableObjectProvider_Size_Call { + _c.Call.Return(run) + return _c +} + +// WriteFromFileSystem provides a mock function for the type MockSeekableObjectProvider +func (_mock *MockSeekableObjectProvider) WriteFromFileSystem(ctx context.Context, path string) error { + ret := _mock.Called(ctx, path) + + if len(ret) == 0 { + panic("no return value specified for WriteFromFileSystem") + } + + var r0 error + if returnFunc, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = returnFunc(ctx, path) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// MockSeekableObjectProvider_WriteFromFileSystem_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WriteFromFileSystem' +type MockSeekableObjectProvider_WriteFromFileSystem_Call struct { + *mock.Call +} + +// WriteFromFileSystem is a helper method to define mock.On call +// - ctx context.Context +// - path string +func (_e *MockSeekableObjectProvider_Expecter) WriteFromFileSystem(ctx interface{}, path interface{}) *MockSeekableObjectProvider_WriteFromFileSystem_Call { + return &MockSeekableObjectProvider_WriteFromFileSystem_Call{Call: _e.mock.On("WriteFromFileSystem", ctx, path)} +} + +func (_c *MockSeekableObjectProvider_WriteFromFileSystem_Call) Run(run func(ctx context.Context, path string)) *MockSeekableObjectProvider_WriteFromFileSystem_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 string + if args[1] != nil { + arg1 = args[1].(string) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockSeekableObjectProvider_WriteFromFileSystem_Call) Return(err error) *MockSeekableObjectProvider_WriteFromFileSystem_Call { + _c.Call.Return(err) + return _c +} + +func (_c *MockSeekableObjectProvider_WriteFromFileSystem_Call) RunAndReturn(run func(ctx context.Context, path string) error) *MockSeekableObjectProvider_WriteFromFileSystem_Call { + _c.Call.Return(run) + return _c +} diff --git a/packages/shared/pkg/storage/storage.go b/packages/shared/pkg/storage/storage.go index d5eaaaa360..a26ee1439b 100644 --- a/packages/shared/pkg/storage/storage.go +++ b/packages/shared/pkg/storage/storage.go @@ -14,7 +14,10 @@ import ( "github.com/e2b-dev/infra/packages/shared/pkg/utils" ) -var tracer = otel.Tracer("github.com/e2b-dev/infra/packages/shared/pkg/storage") +var ( + tracer = otel.Tracer("github.com/e2b-dev/infra/packages/shared/pkg/storage") + meter = otel.GetMeterProvider().Meter("shared.pkg.storage") +) var ErrObjectNotExist = errors.New("object does not exist") @@ -33,10 +36,31 @@ const ( MemoryChunkSize = 4 * 1024 * 1024 // 4 MB ) +type SeekableObjectType int + +const ( + UnknownSeekableObjectType SeekableObjectType = iota + MemfileObjectType + RootFSObjectType +) + +type ObjectType int + +const ( + UnknownObjectType ObjectType = iota + MemfileHeaderObjectType + RootFSHeaderObjectType + SnapfileObjectType + MetadataObjectType + BuildLayerFileObjectType + LayerMetadataObjectType +) + type StorageProvider interface { DeleteObjectsWithPrefix(ctx context.Context, prefix string) error UploadSignedURL(ctx context.Context, path string, ttl time.Duration) (string, error) - OpenObject(ctx context.Context, path string) (StorageObjectProvider, error) + OpenObject(ctx context.Context, path string, objectType ObjectType) (ObjectProvider, error) + OpenSeekableObject(ctx context.Context, path string, seekableObjectType SeekableObjectType) (SeekableObjectProvider, error) GetDetails() string } @@ -52,15 +76,27 @@ type ReaderAtCtx interface { ReadAt(ctx context.Context, p []byte, off int64) (n int, err error) } -type StorageObjectProvider interface { +type ObjectProvider interface { + // write WriterCtx + WriteFromFileSystem(ctx context.Context, path string) error + + // read WriterToCtx - ReaderAtCtx + // utility + Exists(ctx context.Context) (bool, error) +} + +type SeekableObjectProvider interface { + // write WriteFromFileSystem(ctx context.Context, path string) error + // read + ReaderAtCtx + + // utility Size(ctx context.Context) (int64, error) - Delete(ctx context.Context) error } func GetTemplateStorageProvider(ctx context.Context, limiter *limit.Limiter) (StorageProvider, error) { diff --git a/packages/shared/pkg/storage/storage_aws.go b/packages/shared/pkg/storage/storage_aws.go index c89bd312b2..bced012afd 100644 --- a/packages/shared/pkg/storage/storage_aws.go +++ b/packages/shared/pkg/storage/storage_aws.go @@ -37,7 +37,10 @@ type AWSBucketStorageObjectProvider struct { bucketName string } -var _ StorageObjectProvider = (*AWSBucketStorageObjectProvider)(nil) +var ( + _ SeekableObjectProvider = (*AWSBucketStorageObjectProvider)(nil) + _ ObjectProvider = (*AWSBucketStorageObjectProvider)(nil) +) func NewAWSBucketStorageProvider(ctx context.Context, bucketName string) (*AWSBucketStorageProvider, error) { cfg, err := config.LoadDefaultConfig(ctx) @@ -121,7 +124,15 @@ func (a *AWSBucketStorageProvider) UploadSignedURL(ctx context.Context, path str return resp.URL, nil } -func (a *AWSBucketStorageProvider) OpenObject(_ context.Context, path string) (StorageObjectProvider, error) { +func (a *AWSBucketStorageProvider) OpenSeekableObject(_ context.Context, path string, _ SeekableObjectType) (SeekableObjectProvider, error) { + return &AWSBucketStorageObjectProvider{ + client: a.client, + bucketName: a.bucketName, + path: path, + }, nil +} + +func (a *AWSBucketStorageProvider) OpenObject(_ context.Context, path string, _ ObjectType) (ObjectProvider, error) { return &AWSBucketStorageObjectProvider{ client: a.client, bucketName: a.bucketName, @@ -249,6 +260,12 @@ func (a *AWSBucketStorageObjectProvider) Size(ctx context.Context) (int64, error return *resp.ContentLength, nil } +func (a *AWSBucketStorageObjectProvider) Exists(ctx context.Context) (bool, error) { + _, err := a.Size(ctx) + + return err == nil, ignoreNotExists(err) +} + func (a *AWSBucketStorageObjectProvider) Delete(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, awsOperationTimeout) defer cancel() @@ -262,3 +279,11 @@ func (a *AWSBucketStorageObjectProvider) Delete(ctx context.Context) error { return err } + +func ignoreNotExists(err error) error { + if errors.Is(err, ErrObjectNotExist) { + return nil + } + + return err +} diff --git a/packages/shared/pkg/storage/storage_cache.go b/packages/shared/pkg/storage/storage_cache.go index e2484a25e5..11af173a7f 100644 --- a/packages/shared/pkg/storage/storage_cache.go +++ b/packages/shared/pkg/storage/storage_cache.go @@ -1,21 +1,15 @@ package storage import ( - "bytes" "context" "errors" "fmt" "io" "os" "path/filepath" - "strconv" "time" - "github.com/google/uuid" - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/e2b-dev/infra/packages/shared/pkg/telemetry" @@ -28,7 +22,6 @@ const ( ) var ( - meter = otel.GetMeterProvider().Meter("shared.pkg.storage") cacheReadTimerFactory = utils.Must(telemetry.NewTimerFactory(meter, "orchestrator.storage.cache.read", "Duration of cached reads", @@ -67,8 +60,8 @@ func (c CachedProvider) UploadSignedURL(ctx context.Context, path string, ttl ti return c.inner.UploadSignedURL(ctx, path, ttl) } -func (c CachedProvider) OpenObject(ctx context.Context, path string) (StorageObjectProvider, error) { - innerObject, err := c.inner.OpenObject(ctx, path) +func (c CachedProvider) OpenObject(ctx context.Context, path string, objectType ObjectType) (ObjectProvider, error) { + innerObject, err := c.inner.OpenObject(ctx, path, objectType) if err != nil { return nil, fmt.Errorf("failed to open object: %w", err) } @@ -78,336 +71,26 @@ func (c CachedProvider) OpenObject(ctx context.Context, path string) (StorageObj return nil, fmt.Errorf("failed to create cache directory: %w", err) } - return &CachedFileObjectProvider{path: localPath, chunkSize: c.chunkSize, inner: innerObject}, nil + return &CachedObjectProvider{path: localPath, chunkSize: c.chunkSize, inner: innerObject}, nil } -func (c CachedProvider) GetDetails() string { - return fmt.Sprintf("[Caching file storage, base path set to %s, which wraps %s]", - c.rootPath, c.inner.GetDetails()) -} - -type CachedFileObjectProvider struct { - path string - chunkSize int64 - inner StorageObjectProvider -} - -var _ StorageObjectProvider = (*CachedFileObjectProvider)(nil) - -// WriteTo is used for very small files and we can check against their size to ensure the content is valid. -func (c *CachedFileObjectProvider) WriteTo(ctx context.Context, dst io.Writer) (int64, error) { - ctx, span := tracer.Start(ctx, "CachedFileObjectProvider.WriteTo") - defer span.End() - - if bytesRead, ok := c.copyFullFileFromCache(ctx, dst); ok { - return bytesRead, nil - } - - return c.readAndCacheFullRemoteFile(ctx, dst) -} - -func (c *CachedFileObjectProvider) WriteFromFileSystem(ctx context.Context, path string) error { - return c.inner.WriteFromFileSystem(ctx, path) -} - -func (c *CachedFileObjectProvider) Write(ctx context.Context, src []byte) (int, error) { - return c.inner.Write(ctx, src) -} - -func (c *CachedFileObjectProvider) ReadAt(ctx context.Context, buff []byte, offset int64) (int, error) { - ctx, span := tracer.Start(ctx, "CachedFileObjectProvider.ReadAt", trace.WithAttributes( - attribute.Int64("offset", offset), - attribute.Int("buff_len", len(buff)), - )) - defer span.End() - - if err := c.validateReadAtParams(int64(len(buff)), offset); err != nil { - return 0, err - } - - // try to read from cache first - chunkPath := c.makeChunkFilename(offset) - - readTimer := cacheReadTimerFactory.Begin() - count, err := c.readAtFromCache(chunkPath, buff) - if ignoreEOF(err) == nil { - cacheHits.Add(ctx, 1) - readTimer.End(ctx, int64(count)) - - return count, err // return `err` in case it's io.EOF - } - cacheMisses.Add(ctx, 1) - - zap.L().Debug("failed to read cached chunk, falling back to remote read", - zap.String("chunk_path", chunkPath), - zap.Int64("offset", offset), - zap.Error(err)) - - // read remote file - readCount, err := c.inner.ReadAt(ctx, buff, offset) - if err != nil { - return 0, fmt.Errorf("failed to perform uncached read: %w", err) - } - - go func() { - c.writeChunkToCache(context.WithoutCancel(ctx), offset, chunkPath, buff[:readCount]) - }() - - return readCount, nil -} - -var ( - ErrOffsetUnaligned = errors.New("offset must be a multiple of chunk size") - ErrBufferTooSmall = errors.New("buffer is too small") - ErrMultipleChunks = errors.New("cannot read multiple chunks") - ErrBufferTooLarge = errors.New("buffer is too large") -) - -func (c *CachedFileObjectProvider) Size(ctx context.Context) (int64, error) { - if size, ok := c.readLocalSize(); ok { - cacheHits.Add(ctx, 1) - - return size, nil - } - cacheMisses.Add(ctx, 1) - - size, err := c.inner.Size(ctx) - if err != nil { - return 0, err - } - - go c.writeLocalSize(size) - - return size, nil -} - -func (c *CachedFileObjectProvider) Delete(ctx context.Context) error { - return c.inner.Delete(ctx) -} - -func (c *CachedFileObjectProvider) readLocalSize() (int64, bool) { - fname := c.sizeFilename() - content, err := os.ReadFile(fname) - if err != nil { - zap.L().Warn("failed to read cached size, falling back to remote read", - zap.String("path", fname), - zap.Error(err)) - - return 0, false - } - - size, err := strconv.ParseInt(string(content), 10, 64) - if err != nil { - zap.L().Error("failed to parse cached size, falling back to remote read", - zap.String("path", fname), - zap.String("content", string(content)), - zap.Error(err)) - - return 0, false - } - - return size, true -} - -func (c *CachedFileObjectProvider) validateReadAtParams(buffSize, offset int64) error { - if buffSize == 0 { - return ErrBufferTooSmall - } - if buffSize > c.chunkSize { - return ErrBufferTooLarge - } - if offset%c.chunkSize != 0 { - return ErrOffsetUnaligned - } - if (offset%c.chunkSize)+buffSize > c.chunkSize { - return ErrMultipleChunks - } - - return nil -} - -func (c *CachedFileObjectProvider) sizeFilename() string { - return filepath.Join(c.path, "size.txt") -} - -func (c *CachedFileObjectProvider) writeLocalSize(size int64) { - tempFilename := filepath.Join(c.path, fmt.Sprintf(".size.bin.%s", uuid.NewString())) - - if err := os.WriteFile(tempFilename, []byte(fmt.Sprintf("%d", size)), cacheFilePermissions); err != nil { - zap.L().Warn("failed to write to temp file", - zap.String("path", tempFilename), - zap.Error(err)) - - return - } - - finalFilename := c.sizeFilename() - if err := moveWithoutReplace(tempFilename, finalFilename); err != nil { - zap.L().Warn("failed to move temp file", - zap.String("temp_path", tempFilename), - zap.String("final_path", finalFilename), - zap.Error(err)) - - return - } -} - -func (c *CachedFileObjectProvider) tempFullFilename() string { - tempFilename := uuid.NewString() - - return fmt.Sprintf("%s/.temp.content.bin.%s", c.path, tempFilename) -} - -func (c *CachedFileObjectProvider) fullFilename() string { - return fmt.Sprintf("%s/content.bin", c.path) -} - -func (c *CachedFileObjectProvider) makeTempChunkFilename(offset int64) string { - tempFilename := uuid.NewString() - - return fmt.Sprintf("%s/.temp.%012d-%d.bin.%s", c.path, offset/c.chunkSize, c.chunkSize, tempFilename) -} - -func (c *CachedFileObjectProvider) makeChunkFilename(offset int64) string { - return fmt.Sprintf("%s/%012d-%d.bin", c.path, offset/c.chunkSize, c.chunkSize) -} - -func (c *CachedFileObjectProvider) writeChunkToCache(ctx context.Context, offset int64, chunkPath string, bytes []byte) { - writeTimer := cacheWriteTimerFactory.Begin() - - tempPath := c.makeTempChunkFilename(offset) - - if err := os.WriteFile(tempPath, bytes, cacheFilePermissions); err != nil { - zap.L().Error("failed to write temp cache file", - zap.String("tempPath", tempPath), - zap.String("chunkPath", chunkPath), - zap.Int64("offset", offset), - zap.Int("length", len(bytes)), - zap.Error(err), - ) - - return - } - - if err := moveWithoutReplace(tempPath, chunkPath); err != nil { - zap.L().Error("failed to rename temp file", - zap.String("tempPath", tempPath), - zap.String("chunkPath", chunkPath), - zap.Int64("offset", offset), - zap.Int("length", len(bytes)), - zap.Error(err), - ) - - return - } - - writeTimer.End(ctx, int64(len(bytes))) -} - -func (c *CachedFileObjectProvider) writeFullFileToCache(ctx context.Context, b []byte) { - timer := cacheWriteTimerFactory.Begin() - - tempPath := c.tempFullFilename() - - if err := os.WriteFile(tempPath, b, cacheFilePermissions); err != nil { - zap.L().Error("failed to write temp cache file", - zap.String("path", tempPath), - zap.Int("length", len(b)), - zap.Error(err), - ) - - return - } - - finalPath := c.fullFilename() - if err := moveWithoutReplace(tempPath, finalPath); err != nil { - zap.L().Error("failed to rename temp file", - zap.String("tempPath", tempPath), - zap.String("filePath", finalPath), - zap.Int("length", len(b)), - zap.Error(err), - ) - - return - } - - timer.End(ctx, int64(len(b))) -} - -func (c *CachedFileObjectProvider) readAtFromCache(chunkPath string, buff []byte) (int, error) { - var fp *os.File - fp, err := os.Open(chunkPath) +func (c CachedProvider) OpenSeekableObject(ctx context.Context, path string, objectType SeekableObjectType) (SeekableObjectProvider, error) { + innerObject, err := c.inner.OpenSeekableObject(ctx, path, objectType) if err != nil { - return 0, fmt.Errorf("failed to open file: %w", err) - } - - defer cleanup("failed to close chunk", fp.Close) - - count, err := fp.ReadAt(buff, 0) // offset is in the filename - if ignoreEOF(err) != nil { - return 0, fmt.Errorf("failed to read from chunk: %w", err) - } - - return count, err // return `err` in case it's io.EOF -} - -func (c *CachedFileObjectProvider) copyFullFileFromCache(ctx context.Context, dst io.Writer) (int64, bool) { - cachedRead := cacheReadTimerFactory.Begin() - - path := c.fullFilename() - - var fp *os.File - fp, err := os.Open(path) - if err != nil { - if !os.IsNotExist(err) { - zap.L().Error("failed to open full cached file", - zap.String("path", path), - zap.Error(err)) - } - - return 0, false + return nil, fmt.Errorf("failed to open object: %w", err) } - defer cleanup("failed to close full cached file", fp.Close) - - count, err := io.Copy(dst, fp) - if ignoreEOF(err) != nil { - zap.L().Error("failed to read full cached file", - zap.String("path", path), - zap.Error(err)) - - return 0, false + localPath := filepath.Join(c.rootPath, path) + if err = os.MkdirAll(localPath, cacheDirPermissions); err != nil { + return nil, fmt.Errorf("failed to create cache directory: %w", err) } - cachedRead.End(ctx, count) - - return count, true + return &CachedSeekableObjectProvider{path: localPath, chunkSize: c.chunkSize, inner: innerObject}, nil } -const ( - kilobyte = 1024 - megabyte = 1024 * kilobyte -) - -func (c *CachedFileObjectProvider) readAndCacheFullRemoteFile(ctx context.Context, dst io.Writer) (int64, error) { - // This is semi-arbitrary. this code path is called for files that tend to be less than 1 MB (headers, metadata, etc), - // so 2 MB allows us to read the file without needing to allocate more memory, with some room for growth. If the - // file is larger than 2 MB, the buffer will grow, it just won't be as efficient WRT memory allocations. - const writeToInitialBufferSize = 2 * megabyte - - writer := bytes.NewBuffer(make([]byte, 0, writeToInitialBufferSize)) - - if _, err := c.inner.WriteTo(ctx, writer); ignoreEOF(err) != nil { - return 0, err - } - - go func() { - c.writeFullFileToCache(context.WithoutCancel(ctx), writer.Bytes()) - }() - - written, err := dst.Write(writer.Bytes()) - - return int64(written), err +func (c CachedProvider) GetDetails() string { + return fmt.Sprintf("[Caching file storage, base path set to %s, which wraps %s]", + c.rootPath, c.inner.GetDetails()) } func cleanup(msg string, fn func() error) { diff --git a/packages/shared/pkg/storage/storage_cache_object.go b/packages/shared/pkg/storage/storage_cache_object.go new file mode 100644 index 0000000000..a5320f1d42 --- /dev/null +++ b/packages/shared/pkg/storage/storage_cache_object.go @@ -0,0 +1,142 @@ +package storage + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + + "github.com/google/uuid" + "go.uber.org/zap" +) + +const ( + kilobyte = 1024 + megabyte = 1024 * kilobyte +) + +type CachedObjectProvider struct { + path string + chunkSize int64 + inner ObjectProvider +} + +var _ ObjectProvider = CachedObjectProvider{} + +func (c CachedObjectProvider) Exists(ctx context.Context) (bool, error) { + return c.inner.Exists(ctx) +} + +func (c CachedObjectProvider) WriteTo(ctx context.Context, dst io.Writer) (n int64, err error) { + ctx, span := tracer.Start(ctx, "CachedFileObjectProvider.WriteTo") + defer span.End() + + if bytesRead, ok := c.copyFullFileFromCache(ctx, dst); ok { + return bytesRead, nil + } + + return c.readAndCacheFullRemoteFile(ctx, dst) +} + +func (c CachedObjectProvider) Write(ctx context.Context, p []byte) (n int, err error) { + return c.inner.Write(ctx, p) +} + +func (c CachedObjectProvider) WriteFromFileSystem(ctx context.Context, path string) error { + return c.inner.WriteFromFileSystem(ctx, path) +} + +func (c CachedObjectProvider) fullFilename() string { + return fmt.Sprintf("%s/content.bin", c.path) +} + +func (c CachedObjectProvider) tempFullFilename() string { + tempFilename := uuid.NewString() + + return fmt.Sprintf("%s/.temp.content.bin.%s", c.path, tempFilename) +} + +func (c CachedObjectProvider) copyFullFileFromCache(ctx context.Context, dst io.Writer) (int64, bool) { + cachedRead := cacheReadTimerFactory.Begin() + + path := c.fullFilename() + + var fp *os.File + fp, err := os.Open(path) + if err != nil { + if !os.IsNotExist(err) { + zap.L().Error("failed to open full cached file", + zap.String("path", path), + zap.Error(err)) + } + + return 0, false + } + + defer cleanup("failed to close full cached file", fp.Close) + + count, err := io.Copy(dst, fp) + if ignoreEOF(err) != nil { + zap.L().Error("failed to read full cached file", + zap.String("path", path), + zap.Error(err)) + + return 0, false + } + + cachedRead.End(ctx, count) + + return count, true +} + +func (c CachedObjectProvider) readAndCacheFullRemoteFile(ctx context.Context, dst io.Writer) (int64, error) { + // This is semi-arbitrary. this code path is called for files that tend to be less than 1 MB (headers, metadata, etc), + // so 2 MB allows us to read the file without needing to allocate more memory, with some room for growth. If the + // file is larger than 2 MB, the buffer will grow, it just won't be as efficient WRT memory allocations. + const writeToInitialBufferSize = 2 * megabyte + + writer := bytes.NewBuffer(make([]byte, 0, writeToInitialBufferSize)) + + if _, err := c.inner.WriteTo(ctx, writer); ignoreEOF(err) != nil { + return 0, err + } + + go func() { + c.writeFullFileToCache(context.WithoutCancel(ctx), writer.Bytes()) + }() + + written, err := dst.Write(writer.Bytes()) + + return int64(written), err +} + +func (c CachedObjectProvider) writeFullFileToCache(ctx context.Context, b []byte) { + timer := cacheWriteTimerFactory.Begin() + + tempPath := c.tempFullFilename() + + if err := os.WriteFile(tempPath, b, cacheFilePermissions); err != nil { + zap.L().Error("failed to write temp cache file", + zap.String("path", tempPath), + zap.Int("length", len(b)), + zap.Error(err), + ) + + return + } + + finalPath := c.fullFilename() + if err := moveWithoutReplace(tempPath, finalPath); err != nil { + zap.L().Error("failed to rename temp file", + zap.String("tempPath", tempPath), + zap.String("filePath", finalPath), + zap.Int("length", len(b)), + zap.Error(err), + ) + + return + } + + timer.End(ctx, int64(len(b))) +} diff --git a/packages/shared/pkg/storage/storage_cache_seekable.go b/packages/shared/pkg/storage/storage_cache_seekable.go new file mode 100644 index 0000000000..52380efc70 --- /dev/null +++ b/packages/shared/pkg/storage/storage_cache_seekable.go @@ -0,0 +1,220 @@ +package storage + +import ( + "context" + "errors" + "fmt" + "os" + "path/filepath" + "strconv" + + "github.com/google/uuid" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" +) + +var ( + ErrOffsetUnaligned = errors.New("offset must be a multiple of chunk size") + ErrBufferTooSmall = errors.New("buffer is too small") + ErrMultipleChunks = errors.New("cannot read multiple chunks") + ErrBufferTooLarge = errors.New("buffer is too large") +) + +type CachedSeekableObjectProvider struct { + path string + chunkSize int64 + inner SeekableObjectProvider +} + +var _ SeekableObjectProvider = CachedSeekableObjectProvider{} + +func (c CachedSeekableObjectProvider) ReadAt(ctx context.Context, buff []byte, offset int64) (n int, err error) { + ctx, span := tracer.Start(ctx, "CachedFileObjectProvider.ReadAt", trace.WithAttributes( + attribute.Int64("offset", offset), + attribute.Int("buff_len", len(buff)), + )) + defer span.End() + + if err := c.validateReadAtParams(int64(len(buff)), offset); err != nil { + return 0, err + } + + // try to read from cache first + chunkPath := c.makeChunkFilename(offset) + + readTimer := cacheReadTimerFactory.Begin() + count, err := c.readAtFromCache(chunkPath, buff) + if ignoreEOF(err) == nil { + cacheHits.Add(ctx, 1) + readTimer.End(ctx, int64(count)) + + return count, err // return `err` in case it's io.EOF + } + cacheMisses.Add(ctx, 1) + + zap.L().Debug("failed to read cached chunk, falling back to remote read", + zap.String("chunk_path", chunkPath), + zap.Int64("offset", offset), + zap.Error(err)) + + // read remote file + readCount, err := c.inner.ReadAt(ctx, buff, offset) + if err != nil { + return 0, fmt.Errorf("failed to perform uncached read: %w", err) + } + + go func() { + c.writeChunkToCache(context.WithoutCancel(ctx), offset, chunkPath, buff[:readCount]) + }() + + return readCount, nil +} + +func (c CachedSeekableObjectProvider) Size(ctx context.Context) (int64, error) { + if size, ok := c.readLocalSize(); ok { + cacheHits.Add(ctx, 1) + + return size, nil + } + cacheMisses.Add(ctx, 1) + + size, err := c.inner.Size(ctx) + if err != nil { + return 0, err + } + + go c.writeLocalSize(size) + + return size, nil +} + +func (c CachedSeekableObjectProvider) WriteFromFileSystem(ctx context.Context, path string) error { + return c.inner.WriteFromFileSystem(ctx, path) +} + +func (c CachedSeekableObjectProvider) makeChunkFilename(offset int64) string { + return fmt.Sprintf("%s/%012d-%d.bin", c.path, offset/c.chunkSize, c.chunkSize) +} + +func (c CachedSeekableObjectProvider) makeTempChunkFilename(offset int64) string { + tempFilename := uuid.NewString() + + return fmt.Sprintf("%s/.temp.%012d-%d.bin.%s", c.path, offset/c.chunkSize, c.chunkSize, tempFilename) +} + +func (c CachedSeekableObjectProvider) readAtFromCache(chunkPath string, buff []byte) (int, error) { + var fp *os.File + fp, err := os.Open(chunkPath) + if err != nil { + return 0, fmt.Errorf("failed to open file: %w", err) + } + + defer cleanup("failed to close chunk", fp.Close) + + count, err := fp.ReadAt(buff, 0) // offset is in the filename + if ignoreEOF(err) != nil { + return 0, fmt.Errorf("failed to read from chunk: %w", err) + } + + return count, err // return `err` in case it's io.EOF +} + +func (c CachedSeekableObjectProvider) sizeFilename() string { + return filepath.Join(c.path, "size.txt") +} + +func (c CachedSeekableObjectProvider) readLocalSize() (int64, bool) { + fname := c.sizeFilename() + content, err := os.ReadFile(fname) + if err != nil { + zap.L().Warn("failed to read cached size, falling back to remote read", + zap.String("path", fname), + zap.Error(err)) + + return 0, false + } + + size, err := strconv.ParseInt(string(content), 10, 64) + if err != nil { + zap.L().Error("failed to parse cached size, falling back to remote read", + zap.String("path", fname), + zap.String("content", string(content)), + zap.Error(err)) + + return 0, false + } + + return size, true +} + +func (c CachedSeekableObjectProvider) validateReadAtParams(buffSize, offset int64) error { + if buffSize == 0 { + return ErrBufferTooSmall + } + if buffSize > c.chunkSize { + return ErrBufferTooLarge + } + if offset%c.chunkSize != 0 { + return ErrOffsetUnaligned + } + if (offset%c.chunkSize)+buffSize > c.chunkSize { + return ErrMultipleChunks + } + + return nil +} + +func (c CachedSeekableObjectProvider) writeChunkToCache(ctx context.Context, offset int64, chunkPath string, bytes []byte) { + writeTimer := cacheWriteTimerFactory.Begin() + + tempPath := c.makeTempChunkFilename(offset) + + if err := os.WriteFile(tempPath, bytes, cacheFilePermissions); err != nil { + zap.L().Error("failed to write temp cache file", + zap.String("tempPath", tempPath), + zap.String("chunkPath", chunkPath), + zap.Int64("offset", offset), + zap.Int("length", len(bytes)), + zap.Error(err), + ) + + return + } + + if err := moveWithoutReplace(tempPath, chunkPath); err != nil { + zap.L().Error("failed to rename temp file", + zap.String("tempPath", tempPath), + zap.String("chunkPath", chunkPath), + zap.Int64("offset", offset), + zap.Int("length", len(bytes)), + zap.Error(err), + ) + + return + } + + writeTimer.End(ctx, int64(len(bytes))) +} + +func (c CachedSeekableObjectProvider) writeLocalSize(size int64) { + tempFilename := filepath.Join(c.path, fmt.Sprintf(".size.bin.%s", uuid.NewString())) + + if err := os.WriteFile(tempFilename, []byte(fmt.Sprintf("%d", size)), cacheFilePermissions); err != nil { + zap.L().Warn("failed to write to temp file", + zap.String("path", tempFilename), + zap.Error(err)) + + return + } + + finalFilename := c.sizeFilename() + if err := moveWithoutReplace(tempFilename, finalFilename); err != nil { + zap.L().Warn("failed to move temp file", + zap.String("temp_path", tempFilename), + zap.String("final_path", finalFilename), + zap.Error(err)) + + return + } +} diff --git a/packages/shared/pkg/storage/storage_cache_test.go b/packages/shared/pkg/storage/storage_cache_test.go index c7f01a19b4..fa5950c1c2 100644 --- a/packages/shared/pkg/storage/storage_cache_test.go +++ b/packages/shared/pkg/storage/storage_cache_test.go @@ -17,7 +17,7 @@ import ( ) func TestCachedFileObjectProvider_MakeChunkFilename(t *testing.T) { - c := CachedFileObjectProvider{path: "/a/b/c", chunkSize: 1024} + c := CachedSeekableObjectProvider{path: "/a/b/c", chunkSize: 1024} filename := c.makeChunkFilename(1024 * 4) assert.Equal(t, "/a/b/c/000000000004-1024.bin", filename) } @@ -26,10 +26,10 @@ func TestCachedFileObjectProvider_Size(t *testing.T) { t.Run("can be cached successfully", func(t *testing.T) { const expectedSize int64 = 1024 - inner := storagemocks.NewMockStorageObjectProvider(t) + inner := storagemocks.NewMockSeekableObjectProvider(t) inner.EXPECT().Size(mock.Anything).Return(expectedSize, nil) - c := CachedFileObjectProvider{path: t.TempDir(), inner: inner} + c := CachedSeekableObjectProvider{path: t.TempDir(), inner: inner} // first call will write to cache size, err := c.Size(t.Context()) @@ -53,7 +53,7 @@ func TestCachedFileObjectProvider_WriteTo(t *testing.T) { tempDir := t.TempDir() tempPath := filepath.Join(tempDir, "a", "b", "c") - c := CachedFileObjectProvider{path: tempPath, chunkSize: 3} + c := CachedSeekableObjectProvider{path: tempPath, chunkSize: 3} // create cache file cacheFilename := c.makeChunkFilename(0) @@ -72,7 +72,7 @@ func TestCachedFileObjectProvider_WriteTo(t *testing.T) { t.Run("consecutive ReadAt calls should cache", func(t *testing.T) { fakeData := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} - fakeStorageObjectProvider := storagemocks.NewMockStorageObjectProvider(t) + fakeStorageObjectProvider := storagemocks.NewMockSeekableObjectProvider(t) fakeStorageObjectProvider.EXPECT(). ReadAt(mock.Anything, mock.Anything, mock.Anything). @@ -86,7 +86,7 @@ func TestCachedFileObjectProvider_WriteTo(t *testing.T) { }) tempDir := t.TempDir() - c := CachedFileObjectProvider{ + c := CachedSeekableObjectProvider{ path: tempDir, chunkSize: 3, inner: fakeStorageObjectProvider, @@ -114,7 +114,7 @@ func TestCachedFileObjectProvider_WriteTo(t *testing.T) { t.Run("WriteTo calls should read from cache", func(t *testing.T) { fakeData := []byte{1, 2, 3} - fakeStorageObjectProvider := storagemocks.NewMockStorageObjectProvider(t) + fakeStorageObjectProvider := storagemocks.NewMockObjectProvider(t) fakeStorageObjectProvider.EXPECT(). WriteTo(mock.Anything, mock.Anything). RunAndReturn(func(_ context.Context, dst io.Writer) (int64, error) { @@ -124,7 +124,7 @@ func TestCachedFileObjectProvider_WriteTo(t *testing.T) { }) tempDir := t.TempDir() - c := CachedFileObjectProvider{ + c := CachedObjectProvider{ path: tempDir, chunkSize: 3, inner: fakeStorageObjectProvider, @@ -184,7 +184,7 @@ func TestCachedFileObjectProvider_validateReadAtParams(t *testing.T) { for name, tc := range testcases { t.Run(name, func(t *testing.T) { - c := CachedFileObjectProvider{ + c := CachedSeekableObjectProvider{ chunkSize: tc.chunkSize, } err := c.validateReadAtParams(tc.bufferSize, tc.offset) diff --git a/packages/shared/pkg/storage/storage_fs.go b/packages/shared/pkg/storage/storage_fs.go index 68b8b49a0b..a5cf472222 100644 --- a/packages/shared/pkg/storage/storage_fs.go +++ b/packages/shared/pkg/storage/storage_fs.go @@ -20,7 +20,10 @@ type FileSystemStorageObjectProvider struct { path string } -var _ StorageObjectProvider = (*FileSystemStorageObjectProvider)(nil) +var ( + _ SeekableObjectProvider = (*FileSystemStorageObjectProvider)(nil) + _ ObjectProvider = (*FileSystemStorageObjectProvider)(nil) +) func NewFileSystemStorageProvider(basePath string) (*FileSystemStorageProvider, error) { return &FileSystemStorageProvider{ @@ -43,7 +46,18 @@ func (fs *FileSystemStorageProvider) UploadSignedURL(_ context.Context, _ string return "", fmt.Errorf("file system storage does not support signed URLs") } -func (fs *FileSystemStorageProvider) OpenObject(_ context.Context, path string) (StorageObjectProvider, error) { +func (fs *FileSystemStorageProvider) OpenSeekableObject(_ context.Context, path string, _ SeekableObjectType) (SeekableObjectProvider, error) { + dir := filepath.Dir(fs.getPath(path)) + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, err + } + + return &FileSystemStorageObjectProvider{ + path: fs.getPath(path), + }, nil +} + +func (fs *FileSystemStorageProvider) OpenObject(_ context.Context, path string, _ ObjectType) (ObjectProvider, error) { dir := filepath.Dir(fs.getPath(path)) if err := os.MkdirAll(dir, 0o755); err != nil { return nil, err @@ -112,6 +126,15 @@ func (f *FileSystemStorageObjectProvider) ReadAt(_ context.Context, buff []byte, return handle.ReadAt(buff, off) } +func (f *FileSystemStorageObjectProvider) Exists(_ context.Context) (bool, error) { + _, err := os.Stat(f.path) + if os.IsNotExist(err) { + return false, nil + } + + return err == nil, err +} + func (f *FileSystemStorageObjectProvider) Size(_ context.Context) (int64, error) { handle, err := f.getHandle(true) if err != nil { diff --git a/packages/shared/pkg/storage/storage_fs_test.go b/packages/shared/pkg/storage/storage_fs_test.go index e602fd5748..4d7ba710b9 100644 --- a/packages/shared/pkg/storage/storage_fs_test.go +++ b/packages/shared/pkg/storage/storage_fs_test.go @@ -6,6 +6,7 @@ import ( "path/filepath" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -20,11 +21,11 @@ func newTempProvider(t *testing.T) *FileSystemStorageProvider { return p } -func TestOpenObject_ReadWrite_Size_ReadAt(t *testing.T) { +func TestOpenObject_Write_Exists_WriteTo(t *testing.T) { p := newTempProvider(t) ctx := t.Context() - obj, err := p.OpenObject(ctx, filepath.Join("sub", "file.txt")) + obj, err := p.OpenObject(ctx, filepath.Join("sub", "file.txt"), MetadataObjectType) require.NoError(t, err) contents := []byte("hello world") @@ -34,9 +35,9 @@ func TestOpenObject_ReadWrite_Size_ReadAt(t *testing.T) { require.Equal(t, len(contents), n) // check Size - size, err := obj.Size(t.Context()) + exists, err := obj.Exists(t.Context()) require.NoError(t, err) - require.Equal(t, int64(len(contents)), size) + require.True(t, exists) // read the entire file back via WriteTo var buf bytes.Buffer @@ -44,13 +45,6 @@ func TestOpenObject_ReadWrite_Size_ReadAt(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(len(contents)), n64) require.Equal(t, contents, buf.Bytes()) - - // read a slice via ReadAt ("world") - part := make([]byte, 5) - nRead, err := obj.ReadAt(t.Context(), part, 6) - require.NoError(t, err) - require.Equal(t, 5, nRead) - require.Equal(t, []byte("world"), part) } func TestWriteFromFileSystem(t *testing.T) { @@ -62,7 +56,7 @@ func TestWriteFromFileSystem(t *testing.T) { const payload = "copy me please" require.NoError(t, os.WriteFile(srcPath, []byte(payload), 0o600)) - obj, err := p.OpenObject(ctx, "copy/dst.txt") + obj, err := p.OpenObject(ctx, "copy/dst.txt", UnknownObjectType) require.NoError(t, err) require.NoError(t, obj.WriteFromFileSystem(t.Context(), srcPath)) @@ -76,16 +70,23 @@ func TestDelete(t *testing.T) { p := newTempProvider(t) ctx := t.Context() - obj, err := p.OpenObject(ctx, "to/delete.txt") + obj, err := p.OpenObject(ctx, "to/delete.txt", 0) require.NoError(t, err) _, err = obj.Write(t.Context(), []byte("bye")) require.NoError(t, err) - require.NoError(t, obj.Delete(t.Context())) + + exists, err := obj.Exists(t.Context()) + require.NoError(t, err) + assert.True(t, exists) + + err = p.DeleteObjectsWithPrefix(t.Context(), "to/delete.txt") + require.NoError(t, err) // subsequent Size call should fail with ErrorObjectNotExist - _, err = obj.Size(t.Context()) - require.ErrorIs(t, err, ErrObjectNotExist) + exists, err = obj.Exists(t.Context()) + require.NoError(t, err) + assert.False(t, exists) } func TestDeleteObjectsWithPrefix(t *testing.T) { @@ -98,7 +99,7 @@ func TestDeleteObjectsWithPrefix(t *testing.T) { "data/sub/c.txt", } for _, pth := range paths { - obj, err := p.OpenObject(ctx, pth) + obj, err := p.OpenObject(ctx, pth, UnknownObjectType) require.NoError(t, err) _, err = obj.Write(t.Context(), []byte("x")) require.NoError(t, err) @@ -118,7 +119,7 @@ func TestWriteToNonExistentObject(t *testing.T) { p := newTempProvider(t) ctx := t.Context() - obj, err := p.OpenObject(ctx, "missing/file.txt") + obj, err := p.OpenObject(ctx, "missing/file.txt", UnknownObjectType) require.NoError(t, err) var sink bytes.Buffer diff --git a/packages/shared/pkg/storage/storage_google.go b/packages/shared/pkg/storage/storage_google.go index 51f4b127f2..0beed59eea 100644 --- a/packages/shared/pkg/storage/storage_google.go +++ b/packages/shared/pkg/storage/storage_google.go @@ -66,7 +66,10 @@ type GCPBucketStorageObjectProvider struct { limiter *limit.Limiter } -var _ StorageObjectProvider = (*GCPBucketStorageObjectProvider)(nil) +var ( + _ SeekableObjectProvider = (*GCPBucketStorageObjectProvider)(nil) + _ ObjectProvider = (*GCPBucketStorageObjectProvider)(nil) +) func NewGCPBucketStorageProvider(ctx context.Context, bucketName string, limiter *limit.Limiter) (*GCPBucketStorageProvider, error) { client, err := storage.NewClient(ctx) @@ -128,7 +131,29 @@ func (g *GCPBucketStorageProvider) UploadSignedURL(_ context.Context, path strin return url, nil } -func (g *GCPBucketStorageProvider) OpenObject(_ context.Context, path string) (StorageObjectProvider, error) { +func (g *GCPBucketStorageProvider) OpenSeekableObject(_ context.Context, path string, _ SeekableObjectType) (SeekableObjectProvider, error) { + handle := g.bucket.Object(path).Retryer( + storage.WithMaxAttempts(googleMaxAttempts), + storage.WithPolicy(storage.RetryAlways), + storage.WithBackoff( + gax.Backoff{ + Initial: googleInitialBackoff, + Max: googleMaxBackoff, + Multiplier: googleBackoffMultiplier, + }, + ), + ) + + return &GCPBucketStorageObjectProvider{ + storage: g, + path: path, + handle: handle, + + limiter: g.limiter, + }, nil +} + +func (g *GCPBucketStorageProvider) OpenObject(_ context.Context, path string, _ ObjectType) (ObjectProvider, error) { handle := g.bucket.Object(path).Retryer( storage.WithMaxAttempts(googleMaxAttempts), storage.WithPolicy(storage.RetryAlways), @@ -161,6 +186,12 @@ func (g *GCPBucketStorageObjectProvider) Delete(ctx context.Context) error { return nil } +func (g *GCPBucketStorageObjectProvider) Exists(ctx context.Context) (bool, error) { + _, err := g.Size(ctx) + + return err == nil, ignoreNotExists(err) +} + func (g *GCPBucketStorageObjectProvider) Size(ctx context.Context) (int64, error) { ctx, cancel := context.WithTimeout(ctx, googleOperationTimeout) defer cancel()