Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(repository): Metadata compression config support for indirect content #557

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func remoteRepositoryTest(ctx context.Context, t *testing.T, rep repo.Repository
_, err := ow.Write([]byte{2, 3, 4})
require.NoError(t, err)

_, err = ow.Result()
_, err = ow.Result("zstd-fastest")
if err == nil {
return errors.Errorf("unexpected success writing object with 'm' prefix")
}
Expand Down Expand Up @@ -263,7 +263,7 @@ func mustWriteObject(ctx context.Context, t *testing.T, w repo.RepositoryWriter,
_, err := ow.Write(data)
require.NoError(t, err)

result, err := ow.Result()
result, err := ow.Result("zstd-fastest")
require.NoError(t, err)

return result
Expand Down
4 changes: 2 additions & 2 deletions repo/blob/storage_extend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetention(t *testing.T) {
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
io.WriteString(w, "hello world!")
w.Result()
w.Result("zstd-fastest")
w.Close()

env.RepositoryWriter.Flush(ctx)
Expand Down Expand Up @@ -105,7 +105,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionUnsupported(t *testing.
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
io.WriteString(w, "hello world!")
w.Result()
w.Result("zstd-fastest")
w.Close()

env.RepositoryWriter.Flush(ctx)
Expand Down
29 changes: 29 additions & 0 deletions repo/content/content_manager_test.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion repo/format/upgrade_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,6 @@ func writeObject(ctx context.Context, t *testing.T, rep repo.RepositoryWriter, d
_, err := w.Write(data)
require.NoError(t, err, testCaseID)

_, err = w.Result()
_, err = w.Result("zstd-fastest")
require.NoError(t, err, testCaseID)
}
4 changes: 2 additions & 2 deletions repo/grpc_repository_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,9 @@ func (r *grpcRepositoryClient) NewWriter(ctx context.Context, opt WriteSessionOp
}

// ConcatenateObjects creates a concatenated objects from the provided object IDs.
func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) {
func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID, comp compression.Name) (object.ID, error) {
//nolint:wrapcheck
return r.omgr.Concatenate(ctx, objectIDs)
return r.omgr.Concatenate(ctx, objectIDs, comp)
}

// maybeRetry executes the provided callback with or without automatic retries depending on how
Expand Down
2 changes: 1 addition & 1 deletion repo/maintenance/blob_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *formatSpecificTestSuite) TestDeleteUnreferencedBlobs(t *testing.T) {
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
io.WriteString(w, "hello world!")
w.Result()
w.Result("zstd-fastest")
w.Close()

env.RepositoryWriter.Flush(ctx)
Expand Down
4 changes: 2 additions & 2 deletions repo/maintenance/blob_retain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTime(t *testing.T) {
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
io.WriteString(w, "hello world!")
w.Result()
w.Result("zstd-fastest")
w.Close()

env.RepositoryWriter.Flush(ctx)
Expand Down Expand Up @@ -100,7 +100,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTimeDisabled(t *testing
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
io.WriteString(w, "hello world!")
w.Result()
w.Result("zstd-fastest")
w.Close()

env.RepositoryWriter.Flush(ctx)
Expand Down
4 changes: 2 additions & 2 deletions repo/maintenance/content_rewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{})
fmt.Fprintf(ow, "%v", uuid.NewString())
_, err := ow.Result()
_, err := ow.Result("zstd-fastest")
return err
}))
}
Expand All @@ -90,7 +90,7 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "k"})
fmt.Fprintf(ow, "%v", uuid.NewString())
_, err := ow.Result()
_, err := ow.Result("zstd-fastest")
return err
}))
}
Expand Down
4 changes: 2 additions & 2 deletions repo/maintenance/maintenance_safety_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ func (s *formatSpecificTestSuite) TestMaintenanceSafety(t *testing.T) {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"})
fmt.Fprintf(ow, "hello world")
var err error
objectID, err = ow.Result()
objectID, err = ow.Result("zstd-fastest")
return err
}))

// create another object in separate pack.
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"})
fmt.Fprintf(ow, "hello universe")
_, err := ow.Result()
_, err := ow.Result("zstd-fastest")
return err
}))

Expand Down
5 changes: 3 additions & 2 deletions repo/object/object_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (om *Manager) closedWriter(ow *objectWriter) {
// in parallel utilizing more CPU cores. Because some split points now start at fixed boundaries and not content-specific,
// this causes some slight loss of deduplication at concatenation points (typically 1-2 contents, usually <10MB),
// so this method should only be used for very large files where this overhead is relatively small.
func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error) {
func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID, comp compression.Name) (ID, error) {
if len(objectIDs) == 0 {
return EmptyID, errors.Errorf("empty list of objects")
}
Expand All @@ -133,14 +133,15 @@ func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error)
w := om.NewWriter(ctx, WriterOptions{
Prefix: indirectContentPrefix,
Description: "CONCATENATED INDEX",
Compressor: comp,
})
defer w.Close() //nolint:errcheck

if werr := writeIndirectObject(w, concatenatedEntries); werr != nil {
return EmptyID, werr
}

concatID, err := w.Result()
concatID, err := w.Result(comp)
if err != nil {
return EmptyID, errors.Wrap(err, "error writing concatenated index")
}
Expand Down
48 changes: 24 additions & 24 deletions repo/object/object_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestWriters(t *testing.T) {
t.Errorf("write error: %v", err)
}

result, err := writer.Result()
result, err := writer.Result("zstd-fastest")
if err != nil {
t.Errorf("error getting writer results for %v, expected: %v", c.data, c.objectID.String())
continue
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestCompression_ContentCompressionEnabled(t *testing.T) {
Compressor: "gzip",
})
w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000))
oid, err := w.Result()
oid, err := w.Result("zstd-fastest")
require.NoError(t, err)

cid, isCompressed, ok := oid.ContentID()
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestCompression_CustomSplitters(t *testing.T) {
w := om.NewWriter(ctx, tc.wo)

w.Write(bytes.Repeat([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, 128<<10))
oid, err := w.Result()
oid, err := w.Result("zstd-fastest")
require.NoError(t, err)

ndx, ok := oid.IndexObjectID()
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestCompression_ContentCompressionDisabled(t *testing.T) {
Compressor: "gzip",
})
w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000))
oid, err := w.Result()
oid, err := w.Result("zstd-fastest")
require.NoError(t, err)

_, isCompressed, ok := oid.ContentID()
Expand All @@ -263,7 +263,7 @@ func TestWriterCompleteChunkInTwoWrites(t *testing.T) {
writer := om.NewWriter(ctx, WriterOptions{})
writer.Write(b[0:50])
writer.Write(b[0:50])
result, err := writer.Result()
result, err := writer.Result("zstd-fastest")

if !objectIDsEqual(result, mustParseID(t, "cd00e292c5970d3c5e2f0ffa5171e555bc46bfc4faddfb4a418b6840b86e79a3")) {
t.Errorf("unexpected result: %v err: %v", result, err)
Expand All @@ -280,25 +280,25 @@ func TestCheckpointing(t *testing.T) {
allZeroes := make([]byte, 1<<20)

// empty file, nothing flushed
checkpoint1, err := writer.Checkpoint()
checkpoint1, err := writer.Checkpoint("zstd-fastest")
verifyNoError(t, err)

// write some bytes, but not enough to flush.
writer.Write(allZeroes[0:50])
checkpoint2, err := writer.Checkpoint()
checkpoint2, err := writer.Checkpoint("zstd-fastest")
verifyNoError(t, err)

// write enough to flush first content.
writer.Write(allZeroes)
checkpoint3, err := writer.Checkpoint()
checkpoint3, err := writer.Checkpoint("zstd-fastest")
verifyNoError(t, err)

// write enough to flush second content.
writer.Write(allZeroes)
checkpoint4, err := writer.Checkpoint()
checkpoint4, err := writer.Checkpoint("zstd-fastest")
verifyNoError(t, err)

result, err := writer.Result()
result, err := writer.Result("zstd-fastest")
verifyNoError(t, err)

if !objectIDsEqual(checkpoint1, EmptyID) {
Expand All @@ -309,7 +309,7 @@ func TestCheckpointing(t *testing.T) {
t.Errorf("unexpected checkpoint2: %v err: %v", checkpoint2, err)
}

result2, err := writer.Checkpoint()
result2, err := writer.Checkpoint("zstd-fastest")
verifyNoError(t, err)

if result2 != result {
Expand Down Expand Up @@ -354,13 +354,13 @@ func TestObjectWriterRaceBetweenCheckpointAndResult(t *testing.T) {
var eg errgroup.Group

eg.Go(func() error {
_, rerr := w.Result()
_, rerr := w.Result("zstd-fastest")

return rerr
})

eg.Go(func() error {
cpID, cperr := w.Checkpoint()
cpID, cperr := w.Checkpoint("zstd-fastest")
if cperr == nil && cpID != EmptyID {
ids, verr := VerifyObject(ctx, om.contentMgr, cpID)
if verr != nil {
Expand Down Expand Up @@ -467,7 +467,7 @@ func TestIndirection(t *testing.T) {
t.Errorf("write error: %v", err)
}

result, err := writer.Result()
result, err := writer.Result("zstd-fastest")
if err != nil {
t.Errorf("error getting writer results: %v", err)
}
Expand Down Expand Up @@ -512,7 +512,7 @@ func TestHMAC(t *testing.T) {

w := om.NewWriter(ctx, WriterOptions{})
w.Write(c)
result, err := w.Result()
result, err := w.Result("zstd-fastest")

if result.String() != "cad29ff89951a3c085c86cb7ed22b82b51f7bdfda24f932c7f9601f51d5975ba" {
t.Errorf("unexpected result: %v err: %v", result.String(), err)
Expand Down Expand Up @@ -578,7 +578,7 @@ func TestConcatenate(t *testing.T) {
}

for _, tc := range cases {
concatenatedOID, err := om.Concatenate(ctx, tc.inputs)
concatenatedOID, err := om.Concatenate(ctx, tc.inputs, "zstd-fastest")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -617,7 +617,7 @@ func TestConcatenate(t *testing.T) {
}

// make sure results of concatenation can be further concatenated.
concatenated3OID, err := om.Concatenate(ctx, []ID{concatenatedOID, concatenatedOID, concatenatedOID})
concatenated3OID, err := om.Concatenate(ctx, []ID{concatenatedOID, concatenatedOID, concatenatedOID}, "zstd-fastest")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -645,7 +645,7 @@ func mustWriteObject(t *testing.T, om *Manager, data []byte, compressor compress
_, err := w.Write(data)
require.NoError(t, err)

oid, err := w.Result()
oid, err := w.Result("zstd-fastest")
require.NoError(t, err)

return oid
Expand Down Expand Up @@ -728,7 +728,7 @@ func TestEndToEndReadAndSeek(t *testing.T) {
t.Errorf("write error: %v", err)
}

objectID, err := writer.Result()
objectID, err := writer.Result("zstd-fastest")
t.Logf("oid: %v", objectID)

writer.Close()
Expand Down Expand Up @@ -784,7 +784,7 @@ func TestEndToEndReadAndSeekWithCompression(t *testing.T) {

totalBytesWritten += size

objectID, err := writer.Result()
objectID, err := writer.Result("zstd-fastest")

writer.Close()

Expand Down Expand Up @@ -876,7 +876,7 @@ func TestSeek(t *testing.T) {
t.Errorf("write error: %v", err)
}

objectID, err := writer.Result()
objectID, err := writer.Result("zstd-fastest")
if err != nil {
t.Fatalf("unable to write: %v", err)
}
Expand Down Expand Up @@ -938,7 +938,7 @@ func TestWriterFlushFailure_OnFlush(t *testing.T) {

fcm.writeContentError = errSomeError

_, err = w.Result()
_, err = w.Result("zstd-fastest")
require.ErrorIs(t, err, errSomeError)
}

Expand All @@ -951,7 +951,7 @@ func TestWriterFlushFailure_OnCheckpoint(t *testing.T) {
w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1e6))

fcm.writeContentError = errSomeError
_, err := w.Checkpoint()
_, err := w.Checkpoint("zstd-fastest")

require.ErrorIs(t, err, errSomeError)
}
Expand All @@ -970,7 +970,7 @@ func TestWriterFlushFailure_OnAsyncWrite(t *testing.T) {
require.NotErrorIs(t, err, errSomeError)
require.Equal(t, 4000000, n)

_, err = w.Result()
_, err = w.Result("zstd-fastest")
require.ErrorIs(t, err, errSomeError)
}

Expand Down
Loading
Loading