Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pkg/block/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ type Adapter interface {
Exists(ctx context.Context, obj ObjectPointer) (bool, error)
GetRange(ctx context.Context, obj ObjectPointer, startPosition int64, endPosition int64) (io.ReadCloser, error)
GetProperties(ctx context.Context, obj ObjectPointer) (Properties, error)
Remove(ctx context.Context, obj ObjectPointer) error
Copy(ctx context.Context, sourceObj, destinationObj ObjectPointer) error

CreateMultiPartUpload(ctx context.Context, obj ObjectPointer, r *http.Request, opts CreateMultiPartUploadOpts) (*CreateMultiPartUploadResponse, error)
Expand Down
17 changes: 0 additions & 17 deletions pkg/block/azure/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,23 +445,6 @@ func calcETag(contentMD5 []byte, etag *azcore.ETag) string {
return ""
}

func (a *Adapter) Remove(ctx context.Context, obj block.ObjectPointer) error {
var err error
defer reportMetrics("Remove", obj.StorageID, time.Now(), nil, &err)
qualifiedKey, err := resolveBlobURLInfo(obj)
if err != nil {
return err
}
containerClient, err := a.clientCache.NewContainerClient(qualifiedKey.StorageAccountName, qualifiedKey.ContainerName)
if err != nil {
return err
}
blobURL := containerClient.NewBlobClient(qualifiedKey.BlobURL)

_, err = blobURL.Delete(ctx, nil)
return err
}

func (a *Adapter) Copy(ctx context.Context, sourceObj, destinationObj block.ObjectPointer) error {
var err error
defer reportMetrics("Copy", sourceObj.StorageID, time.Now(), nil, &err)
Expand Down
112 changes: 0 additions & 112 deletions pkg/block/blocktest/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/url"
"path"
"path/filepath"
"sort"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -248,117 +247,6 @@ func expectedURLExp(adapter block.Adapter) time.Time {
}
}

// tests the GetProperties method of the adapter, verifying ETag population and consistency with Walker
func testGetProperties(t *testing.T, adapter block.Adapter, storageNamespace string) {
ctx := context.Background()

t.Run("properties_with_etag", func(t *testing.T) {
const contents = "test content for etag"
obj := block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: "test_file_with_etag",
IdentifierType: block.IdentifierTypeRelative,
}

// Put the object
_, err := adapter.Put(ctx, obj, int64(len(contents)), strings.NewReader(contents), block.PutOpts{})
require.NoError(t, err)

// Get properties
props, err := adapter.GetProperties(ctx, obj)
require.NoError(t, err)

// Verify ETag is populated
require.NotEmpty(t, props.ETag, "ETag should be populated")

// Verify ETag format - should not contain quotes (for most adapters)
// Note: Some adapters may have different formats, but none should have quotes
require.NotContains(t, props.ETag, "\"", "ETag should not contain quotes")

// Verify other properties
require.False(t, props.LastModified.IsZero(), "LastModified should be set")
})

t.Run("etag_consistency_with_walker", func(t *testing.T) {
const contents = "test content for walker consistency"
obj := block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: "test_file_walker_consistency",
IdentifierType: block.IdentifierTypeRelative,
}

// Put the object
_, err := adapter.Put(ctx, obj, int64(len(contents)), strings.NewReader(contents), block.PutOpts{})
require.NoError(t, err)

// Get properties
props, err := adapter.GetProperties(ctx, obj)
require.NoError(t, err)

// Get walker and compare ETag
qk, err := adapter.ResolveNamespace(obj.StorageID, obj.StorageNamespace, "", block.IdentifierTypeRelative)
require.NoError(t, err)
uri, err := url.Parse(qk.Format())
require.NoError(t, err)

walker, err := adapter.GetWalker(obj.StorageID, block.WalkerOptions{StorageURI: uri})
require.NoError(t, err)

var walkerETag string
err = walker.Walk(ctx, uri, block.WalkOptions{}, func(e block.ObjectStoreEntry) error {
if strings.HasSuffix(e.FullKey, "test_file_walker_consistency") {
walkerETag = e.ETag
}
return nil
})
require.NoError(t, err)

// Verify that GetProperties ETag matches Walker ETag
require.Equal(t, walkerETag, props.ETag, "GetProperties ETag should match Walker ETag")
})

t.Run("not_found", func(t *testing.T) {
obj := block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: "non_existent_file",
IdentifierType: block.IdentifierTypeRelative,
}

// Get properties for non-existent object
_, err := adapter.GetProperties(ctx, obj)
require.Error(t, err)
require.ErrorIs(t, err, block.ErrDataNotFound)
})

t.Run("properties_stability", func(t *testing.T) {
const contents = "test content for stability"
obj := block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: "test_file_stability",
IdentifierType: block.IdentifierTypeRelative,
}

// Put the object
_, err := adapter.Put(ctx, obj, int64(len(contents)), strings.NewReader(contents), block.PutOpts{})
require.NoError(t, err)

// Get properties multiple times
props1, err := adapter.GetProperties(ctx, obj)
require.NoError(t, err)

props2, err := adapter.GetProperties(ctx, obj)
require.NoError(t, err)

// Verify properties are consistent across calls
require.Equal(t, props1.ETag, props2.ETag, "ETag should be consistent across calls")
require.Equal(t, props1.LastModified, props2.LastModified, "LastModified should be consistent")
})
}

func dumpPathTree(t testing.TB, ctx context.Context, adapter block.Adapter, qk block.QualifiedKey) []string {
t.Helper()
tree := make([]string, 0)
Expand Down
79 changes: 0 additions & 79 deletions pkg/block/blocktest/basic_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"strings"
"testing"

"github.com/go-test/deep"
"github.com/stretchr/testify/require"
"github.com/treeverse/lakefs/pkg/block"
)
Expand All @@ -15,7 +14,6 @@ import (
func AdapterBasicObjectTest(t *testing.T, adapter block.Adapter, storageNamespace, externalPath string) {
t.Run("Adapter_PutGet", func(t *testing.T) { testAdapterPutGet(t, adapter, storageNamespace, externalPath) })
t.Run("Adapter_Copy", func(t *testing.T) { testAdapterCopy(t, adapter, storageNamespace) })
t.Run("Adapter_Remove", func(t *testing.T) { testAdapterRemove(t, adapter, storageNamespace) })
t.Run("Adapter_Exists", func(t *testing.T) { testAdapterExists(t, adapter, storageNamespace) })
}

Expand Down Expand Up @@ -89,83 +87,6 @@ func testAdapterCopy(t *testing.T, adapter block.Adapter, storageNamespace strin
require.Equal(t, contents, string(got))
}

// Parameterized test to test valid and invalid cases for Removing an object via the adaptor
func testAdapterRemove(t *testing.T, adapter block.Adapter, storageNamespace string) {
ctx := context.Background()
const content = "Content used for testing"
tests := []struct {
name string
additionalObjects []string
path string
wantErr bool
wantTree []string
}{
{
name: "test_single",
path: "README",
wantErr: false,
wantTree: []string{},
},

{
name: "test_under_folder",
path: "src/tools.go",
wantErr: false,
wantTree: []string{},
},
{
name: "test_under_multiple_folders",
path: "a/b/c/d.txt",
wantErr: false,
wantTree: []string{},
},
{
name: "file_in_the_way",
path: "a/b/c/d.txt",
additionalObjects: []string{"a/b/blocker.txt"},
wantErr: false,
wantTree: []string{"/a/b/blocker.txt"},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// setup env
envObjects := tt.additionalObjects
envObjects = append(envObjects, tt.path)
for _, p := range envObjects {
obj := block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: tt.name + "/" + p,
IdentifierType: block.IdentifierTypeRelative,
}
_, err := adapter.Put(ctx, obj, int64(len(content)), strings.NewReader(content), block.PutOpts{})
require.NoError(t, err)
}

// test Remove
obj := block.ObjectPointer{
StorageID: "",
StorageNamespace: storageNamespace,
Identifier: tt.name + "/" + tt.path,
IdentifierType: block.IdentifierTypeRelative,
}
if err := adapter.Remove(ctx, obj); (err != nil) != tt.wantErr {
t.Errorf("Remove() error = %v, wantErr %v", err, tt.wantErr)
}

qk, err := adapter.ResolveNamespace("", storageNamespace, tt.name, block.IdentifierTypeRelative)
require.NoError(t, err)

tree := dumpPathTree(t, ctx, adapter, qk)
if diff := deep.Equal(tt.wantTree, tree); diff != nil {
t.Errorf("Remove() tree diff = %s", diff)
}
})
}
}

// Parameterized test of the object Exists method of the Storage adapter
func testAdapterExists(t *testing.T, adapter block.Adapter, storageNamespace string) {
// TODO (niro): Test abs paths
Expand Down
14 changes: 0 additions & 14 deletions pkg/block/gs/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,20 +325,6 @@ func (a *Adapter) GetProperties(ctx context.Context, obj block.ObjectPointer) (b
return props, nil
}

func (a *Adapter) Remove(ctx context.Context, obj block.ObjectPointer) error {
var err error
defer reportMetrics("Remove", obj.StorageID, time.Now(), nil, &err)
bucket, key, err := a.extractParamsFromObj(obj)
if err != nil {
return err
}
err = a.client.Bucket(bucket).Object(key).Delete(ctx)
if err != nil {
return fmt.Errorf("Object(%q).Delete: %w", key, err)
}
return nil
}

func (a *Adapter) Copy(ctx context.Context, sourceObj, destinationObj block.ObjectPointer) error {
var err error
defer reportMetrics("Copy", sourceObj.StorageID, time.Now(), nil, &err)
Expand Down
37 changes: 0 additions & 37 deletions pkg/block/local/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,43 +170,6 @@ func (l *Adapter) Put(_ context.Context, obj block.ObjectPointer, _ int64, reade
return &block.PutResponse{}, nil
}

func (l *Adapter) Remove(_ context.Context, obj block.ObjectPointer) error {
p, err := l.extractParamsFromObj(obj)
if err != nil {
return err
}
p = filepath.Clean(p)
err = os.Remove(p)
if err != nil {
return err
}
if l.removeEmptyDir {
dir := filepath.Dir(p)
repoRoot := obj.StorageNamespace[len(DefaultNamespacePrefix):]
removeEmptyDirUntil(dir, path.Join(l.path, repoRoot))
}
return nil
}

func removeEmptyDirUntil(dir string, stopAt string) {
if stopAt == "" {
return
}
if !strings.HasSuffix(stopAt, "/") {
stopAt += "/"
}
for strings.HasPrefix(dir, stopAt) && dir != stopAt {
err := os.Remove(dir)
if err != nil {
break
}
dir = filepath.Dir(dir)
if dir == "/" {
break
}
}
}

func (l *Adapter) Copy(_ context.Context, sourceObj, destinationObj block.ObjectPointer) error {
source, err := l.extractParamsFromObj(sourceObj)
if err != nil {
Expand Down
17 changes: 0 additions & 17 deletions pkg/block/mem/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,23 +233,6 @@ func (a *Adapter) GetProperties(_ context.Context, obj block.ObjectPointer) (blo
return props, nil
}

func (a *Adapter) Remove(_ context.Context, obj block.ObjectPointer) error {
if err := verifyObjectPointer(obj); err != nil {
return err
}
a.mutex.Lock()
defer a.mutex.Unlock()
storageID := obj.StorageID
key := getKey(obj)
if a.data[storageID] != nil {
delete(a.data[storageID], key)
}
if a.properties[storageID] != nil {
delete(a.properties[storageID], key)
}
return nil
}

func (a *Adapter) Copy(_ context.Context, sourceObj, destinationObj block.ObjectPointer) error {
if err := verifyObjectPointer(sourceObj); err != nil {
return err
Expand Down
9 changes: 0 additions & 9 deletions pkg/block/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,6 @@ func (m *MetricsAdapter) GetProperties(ctx context.Context, obj ObjectPointer) (
return m.adapter.GetProperties(ctx, obj)
}

func (m *MetricsAdapter) Remove(ctx context.Context, obj ObjectPointer) error {
blockstoreType := m.adapter.BlockstoreType()
const operation = "remove"
concurrentOperations.WithLabelValues(operation, blockstoreType).Inc()
defer concurrentOperations.WithLabelValues(operation, blockstoreType).Dec()
ctx = httputil.SetClientTrace(ctx, blockstoreType)
return m.adapter.Remove(ctx, obj)
}

func (m *MetricsAdapter) Copy(ctx context.Context, sourceObj, destinationObj ObjectPointer) error {
blockstoreType := m.adapter.BlockstoreType()
const operation = "copy"
Expand Down
28 changes: 0 additions & 28 deletions pkg/block/s3/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,34 +592,6 @@ func (a *Adapter) GetProperties(ctx context.Context, obj block.ObjectPointer) (b
}, nil
}

func (a *Adapter) Remove(ctx context.Context, obj block.ObjectPointer) error {
var err error
defer reportMetrics("Remove", obj.StorageID, time.Now(), nil, &err)
bucket, key, _, err := a.extractParamsFromObj(obj)
if err != nil {
return err
}

deleteInput := &s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
}
client := a.clients.Get(ctx, bucket)
_, err = client.DeleteObject(ctx, deleteInput)
if err != nil {
a.log(ctx).WithError(err).Error("failed to delete S3 object")
return err
}

headInput := &s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
}
const maxWaitDur = 100 * time.Second
waiter := s3.NewObjectNotExistsWaiter(client)
return waiter.Wait(ctx, headInput, maxWaitDur)
}

func (a *Adapter) copyPart(ctx context.Context, sourceObj, destinationObj block.ObjectPointer, uploadID string, partNumber int, byteRange *string) (*block.UploadPartResponse, error) {
srcKey, err := resolveNamespace(sourceObj)
if err != nil {
Expand Down
Loading