Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support putifabsent #8428

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
d92b29d
works
ItamarYuran Dec 16, 2024
f18a67c
multi if none mash
ItamarYuran Dec 17, 2024
81de1fc
trimmmin' them
ItamarYuran Dec 17, 2024
7d2a511
yalla
ItamarYuran Dec 17, 2024
768ef8c
only if absent babe
ItamarYuran Dec 22, 2024
b07a5ab
erroring
ItamarYuran Dec 22, 2024
c8a508c
tests v1
ItamarYuran Dec 24, 2024
81c8a79
tests v2
ItamarYuran Dec 24, 2024
5fc97da
test v3
ItamarYuran Dec 24, 2024
1dd06fd
test v4
ItamarYuran Dec 24, 2024
a635bb4
test v5
ItamarYuran Dec 24, 2024
717f224
test yalla
ItamarYuran Dec 24, 2024
d558f57
user metadata
ItamarYuran Dec 25, 2024
150776e
beginning s3 client
ItamarYuran Dec 25, 2024
d7e22f4
beginning s3 client
ItamarYuran Dec 25, 2024
4b1533b
local host
ItamarYuran Dec 25, 2024
1528c90
local host
ItamarYuran Dec 25, 2024
08cc251
amen
ItamarYuran Dec 25, 2024
2873d3a
main/object
ItamarYuran Dec 25, 2024
7e42c88
yalla
ItamarYuran Dec 25, 2024
b46deb3
yalla kadima
ItamarYuran Dec 25, 2024
126144f
debug
ItamarYuran Dec 25, 2024
4a56ace
pront all headers
ItamarYuran Dec 25, 2024
c4387d9
I really think it will work now
ItamarYuran Dec 26, 2024
51d8105
this time bby
ItamarYuran Dec 26, 2024
6f2431a
now is the time
ItamarYuran Dec 26, 2024
047bd48
yalla
ItamarYuran Dec 26, 2024
55a2d72
lets see
ItamarYuran Dec 26, 2024
c620816
maybe now
ItamarYuran Dec 26, 2024
69764b1
with multipart test
ItamarYuran Dec 26, 2024
0b2e2b4
test will now pass
ItamarYuran Dec 26, 2024
8b4d12f
svc to s3
ItamarYuran Dec 26, 2024
67da789
smol change
ItamarYuran Dec 26, 2024
005f174
its gonna work i tell u
ItamarYuran Dec 26, 2024
278a345
nooooow
ItamarYuran Dec 26, 2024
94ed11f
looks like we got it
ItamarYuran Dec 26, 2024
e7508d5
formatted
ItamarYuran Dec 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions esti/multipart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestMultipartUpload(t *testing.T) {
partsConcat = append(partsConcat, parts[i]...)
}

completedParts := uploadMultipartParts(t, ctx, logger, resp, parts, 0)
completedParts := uploadMultipartParts(t, ctx, svc, logger, resp, parts, 0)

if isBlockstoreType(block.BlockstoreTypeS3) == nil {
// Object should have Last-Modified time at around time of MPU creation. Ensure
Expand Down Expand Up @@ -166,7 +166,7 @@ func reverse(s string) string {
return string(runes)
}

func uploadMultipartParts(t *testing.T, ctx context.Context, logger logging.Logger, resp *s3.CreateMultipartUploadOutput, parts [][]byte, firstIndex int) []types.CompletedPart {
func uploadMultipartParts(t *testing.T, ctx context.Context, svc *s3.Client, logger logging.Logger, resp *s3.CreateMultipartUploadOutput, parts [][]byte, firstIndex int) []types.CompletedPart {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason here to override the global svc and pass here a custom client svc *s3.Client?
(Even if required please don't use the same name as the global svc since it's confusing)

count := len(parts)
completedParts := make([]types.CompletedPart, count)
errs := make([]error, count)
Expand Down
120 changes: 117 additions & 3 deletions esti/s3_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ import (
"bytes"
"context"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/go-openapi/swag"
"io"
"math/rand"
"net/http"
Expand All @@ -16,6 +13,14 @@ import (
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/aws/smithy-go/middleware"
smithyhttp "github.com/aws/smithy-go/transport/http"
"github.com/go-openapi/swag"
"github.com/thanhpk/randstr"

"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/minio/minio-go/v7/pkg/tags"
Expand Down Expand Up @@ -181,6 +186,115 @@ func TestS3UploadAndDownload(t *testing.T) {
})
}
}
func TestMultipartUploadIfNoneMatch(t *testing.T) {
ctx, logger, repo := setupTest(t)
defer tearDownTest(repo)
s3Endpoint := viper.GetString("s3_endpoint")
s3Client := createS3Client(s3Endpoint, t)
multipartNumberOfParts := 7
multipartPartSize := 5 * 1024 * 1024
type TestCase struct {
Path string
Content string
IfNoneMatch string
ExpectError bool
}

testCases := []TestCase{
{Path: "main/object1", Content: "data", IfNoneMatch: "", ExpectError: false},
{Path: "main/object1", Content: "data", IfNoneMatch: "*", ExpectError: true},
{Path: "main/object1", Content: "data", IfNoneMatch: "", ExpectError: false},
{Path: "main/object1", Content: "data", IfNoneMatch: "", ExpectError: false},
Comment on lines +206 to +207
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both lines seems redundant, any other intention here? or just delete one?

{Path: "main/object2", Content: "data", IfNoneMatch: "*", ExpectError: false},
Comment on lines +204 to +208
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Content field not used why is it even initiated?

}
for i, tc := range testCases {
input := &s3.CreateMultipartUploadInput{
Bucket: aws.String(repo),
Key: aws.String(tc.Path),
}

resp, err := s3Client.CreateMultipartUpload(ctx, input)
require.NoError(t, err, "failed to create multipart upload")
logger.Info("Created multipart upload request")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need in this log here, can spam the total output in the CI.
If the code did not reach here it'll fail in require.NoError(t, err, "failed to create multipart upload") so the log is redundent


parts := make([][]byte, multipartNumberOfParts)
for i := 0; i < multipartNumberOfParts; i++ {
parts[i] = randstr.Bytes(multipartPartSize + i)
}

completedParts := uploadMultipartParts(t, ctx, s3Client, logger, resp, parts, 0)

completeInput := &s3.CompleteMultipartUploadInput{
Bucket: resp.Bucket,
Key: resp.Key,
UploadId: resp.UploadId,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: completedParts,
},
}
_, err = s3Client.CompleteMultipartUpload(ctx, completeInput, s3.WithAPIOptions(setHTTPHeaders(tc.IfNoneMatch)))
if tc.ExpectError {
require.Error(t, err, "was expecting an error with path %s and header %s in test case # %d", tc.Path, tc.IfNoneMatch, i+1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of the requirements is to return AWS errors, let's assert specific errors.
So instead of of ExpectError bool in TestCase struct use a substring.
Let's say if I set if-none-match header to invalid value (not * ) instead some foo then the error contains the string:
An error occurred (NotImplemented) when calling the PutObject operation: A header you provided implies functionality that is not implemented

So the require.Contains should just check that the code part of the string is contained.

type TestCase struct {
   ExpectError string
   // other fields ...
}

Then assert (example):

require.ErrorContains(t, error, tc.ExpectedError)

} else {
require.NoError(t, err, "wasn't expecting error with path %s and header %s in test case # %d", tc.Path, tc.IfNoneMatch, i+1)
}
}
}

func setHTTPHeaders(ifNoneMatch string) func(*middleware.Stack) error {
return func(stack *middleware.Stack) error {
return stack.Build.Add(middleware.BuildMiddlewareFunc("AddIfNoneMatchHeader", func(
ctx context.Context, in middleware.BuildInput, next middleware.BuildHandler,
) (
middleware.BuildOutput, middleware.Metadata, error,
) {
if req, ok := in.Request.(*smithyhttp.Request); ok {
// Add the If-None-Match header
req.Header.Set("If-None-Match", ifNoneMatch)
}
return next.HandleBuild(ctx, in)
}), middleware.Before)
}
}
func TestS3IfNoneMatch(t *testing.T) {

ctx, logger, repo := setupTest(t)
defer tearDownTest(repo)

s3Endpoint := viper.GetString("s3_endpoint")
s3Client := createS3Client(s3Endpoint, t)

type TestCase struct {
Path string
Content string
IfNoneMatch string
ExpectError bool
}

testCases := []TestCase{
{Path: "main/object1", Content: "data", IfNoneMatch: "", ExpectError: false},
{Path: "main/object1", Content: "data", IfNoneMatch: "*", ExpectError: true},
{Path: "main/object2", Content: "data", IfNoneMatch: "*", ExpectError: false},
{Path: "main/object2", Content: "data", IfNoneMatch: "", ExpectError: false},
{Path: "main/object2", Content: "data", IfNoneMatch: "*", ExpectError: true},
{Path: "main/object3", Content: "data", IfNoneMatch: "unsupported string", ExpectError: true},
}
for i, tc := range testCases {
input := &s3.PutObjectInput{
Bucket: aws.String(repo),
Key: aws.String(tc.Path),
Body: strings.NewReader(tc.Content),
}
logger.Info("Sending PutObject request for Path: %s with If-None-Match: %s\n", tc.Path, tc.IfNoneMatch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need in log message here, especially not at Info level, but really no need at all.
if PutObject fails you'll see in the require assertion, if it doesn't - we don't care about the log then.

We try to avoid it as much as possible because in the context of the CI, all tests run together and there are TONS of logs.

_, err := s3Client.PutObject(ctx, input, s3.WithAPIOptions(setHTTPHeaders(tc.IfNoneMatch)))

if tc.ExpectError {
require.Error(t, err, "was expecting an error with path %s and header %s in test case # %d", tc.Path, tc.IfNoneMatch, i+1)
} else {
require.NoError(t, err, "wasn't expecting error with path %s and header %s in test case # %d", tc.Path, tc.IfNoneMatch, i+1)
}
}
}

func verifyObjectInfo(t *testing.T, got minio.ObjectInfo, expectedSize int) {
if got.Err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/gateway/operations/operation_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/treeverse/lakefs/pkg/catalog"
"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/logging"
)

Expand Down Expand Up @@ -40,7 +41,7 @@ func shouldReplaceMetadata(req *http.Request) bool {
return req.Header.Get(amzMetadataDirectiveHeaderPrefix) == "REPLACE"
}

func (o *PathOperation) finishUpload(req *http.Request, mTime *time.Time, checksum, physicalAddress string, size int64, relative bool, metadata map[string]string, contentType string) error {
func (o *PathOperation) finishUpload(req *http.Request, mTime *time.Time, checksum, physicalAddress string, size int64, relative bool, metadata map[string]string, contentType string, allowOverWrite bool) error {
var writeTime time.Time
if mTime == nil {
writeTime = time.Now()
Expand All @@ -59,7 +60,7 @@ func (o *PathOperation) finishUpload(req *http.Request, mTime *time.Time, checks
ContentType(contentType).
Build()

err := o.Catalog.CreateEntry(req.Context(), o.Repository.Name, o.Reference, entry)
err := o.Catalog.CreateEntry(req.Context(), o.Repository.Name, o.Reference, entry, graveler.WithIfAbsent(!allowOverWrite))
if err != nil {
o.Log(req).WithError(err).Error("could not update metadata")
return err
Expand Down
22 changes: 21 additions & 1 deletion pkg/gateway/operations/postobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/catalog"
gatewayErrors "github.com/treeverse/lakefs/pkg/gateway/errors"
"github.com/treeverse/lakefs/pkg/gateway/multipart"
"github.com/treeverse/lakefs/pkg/gateway/path"
Expand Down Expand Up @@ -94,6 +95,21 @@ func (controller *PostObject) HandleCompleteMultipartUpload(w http.ResponseWrite
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrInternalError))
return
}
// before completing multipart upload, check whether if-none-match header is added,
// in order to not overwrite object
allowOverwrite, err := o.checkIfAbsent(req)
if err != nil {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrNotImplemented))
return
}
if !allowOverwrite {
_, err := o.Catalog.GetEntry(req.Context(), o.Repository.Name, o.Reference, o.Path, catalog.GetEntryParams{})
if err == nil {
// In case object exists in catalog, no error returns
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrPreconditionFailed))
return
}
}
objName := multiPart.PhysicalAddress
req = req.WithContext(logging.AddFields(req.Context(), logging.Fields{logging.PhysicalAddressFieldKey: objName}))
xmlMultipartComplete, err := io.ReadAll(req.Body)
Expand Down Expand Up @@ -124,7 +140,11 @@ func (controller *PostObject) HandleCompleteMultipartUpload(w http.ResponseWrite
return
}
checksum := strings.Split(resp.ETag, "-")[0]
err = o.finishUpload(req, resp.MTime, checksum, objName, resp.ContentLength, true, multiPart.Metadata, multiPart.ContentType)
err = o.finishUpload(req, resp.MTime, checksum, objName, resp.ContentLength, true, multiPart.Metadata, multiPart.ContentType, allowOverwrite)
if errors.Is(err, graveler.ErrPreconditionFailed) {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrPreconditionFailed))
return
}
if errors.Is(err, graveler.ErrWriteToProtectedBranch) {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrWriteToProtectedBranch))
return
Expand Down
34 changes: 32 additions & 2 deletions pkg/gateway/operations/putobject.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

const (
IfNoneMatchHeader = "If-None-Match"
CopySourceHeader = "x-amz-copy-source"
CopySourceRangeHeader = "x-amz-copy-source-range"
QueryParamUploadID = "uploadId"
Expand All @@ -30,7 +31,6 @@ type PutObject struct{}

func (controller *PutObject) RequiredPermissions(req *http.Request, repoID, _, destPath string) (permissions.Node, error) {
copySource := req.Header.Get(CopySourceHeader)

if len(copySource) == 0 {
return permissions.Node{
Permission: permissions.Permission{
Expand Down Expand Up @@ -298,6 +298,21 @@ func handlePut(w http.ResponseWriter, req *http.Request, o *PathOperation) {
o.Incr("put_object", o.Principal, o.Repository.Name, o.Reference)
storageClass := StorageClassFromHeader(req.Header)
opts := block.PutOpts{StorageClass: storageClass}
// before uploading object, check whether if-none-match header is added,
// in order to not overwrite object
allowOverwrite, err := o.checkIfAbsent(req)
if err != nil {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrNotImplemented))
return
}
if !allowOverwrite {
_, err := o.Catalog.GetEntry(req.Context(), o.Repository.Name, o.Reference, o.Path, catalog.GetEntryParams{})
if err == nil {
// In case object exists in catalog, no error returns
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrPreconditionFailed))
return
}
}
address := o.PathProvider.NewPath()
blob, err := upload.WriteBlob(req.Context(), o.BlockStore, o.Repository.StorageNamespace, address, req.Body, req.ContentLength, opts)
if err != nil {
Expand All @@ -309,7 +324,11 @@ func handlePut(w http.ResponseWriter, req *http.Request, o *PathOperation) {
// write metadata
metadata := amzMetaAsMetadata(req)
contentType := req.Header.Get("Content-Type")
err = o.finishUpload(req, &blob.CreationDate, blob.Checksum, blob.PhysicalAddress, blob.Size, true, metadata, contentType)
err = o.finishUpload(req, &blob.CreationDate, blob.Checksum, blob.PhysicalAddress, blob.Size, true, metadata, contentType, allowOverwrite)
if errors.Is(err, graveler.ErrPreconditionFailed) {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrPreconditionFailed))
return
}
if errors.Is(err, graveler.ErrWriteToProtectedBranch) {
_ = o.EncodeError(w, req, err, gatewayErrors.Codes.ToAPIErr(gatewayErrors.ErrWriteToProtectedBranch))
return
Expand All @@ -325,3 +344,14 @@ func handlePut(w http.ResponseWriter, req *http.Request, o *PathOperation) {
o.SetHeader(w, "ETag", httputil.ETag(blob.Checksum))
w.WriteHeader(http.StatusOK)
}

func (o *PathOperation) checkIfAbsent(req *http.Request) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this function confusing because:
It does 3 different things:

  • It checks and validates the the header value
  • Get's the value from catalog.
  • Returns allowOverride indicator for the real code somewhere else that checks if absent.

Now that is confusing because it's a "lie" - it only's only partially checking ifAbsent as optimization, since the real check performed later and also does input validation.

I would prefer something much more explicit like a function that extracts the header and validates it, then inline check if object exist:

// checkIfAbsent sets allowOverwrite and validates the header value if set 
allowOverwrite, err := o.checkIfAbsent(req)
if err != nil { 
    // ...
}
if !allowOverwrite { 
    // first check if object exist as optimization to save resources 
    _, err := o.Catalog.GetEntry(req.Context(), o.Repository.Name, o.Reference, o.Path, catalog.GetEntryParams{}) 
    // hadle if err != nil ...
}

headerValue := req.Header.Get(IfNoneMatchHeader)
if headerValue == "" {
return true, nil
}
if headerValue == "*" {
return false, nil
}
return false, gatewayErrors.ErrNotImplemented
}
Loading