Skip to content

Commit

Permalink
Pass metadata compressor as object manager field
Browse files Browse the repository at this point in the history
Signed-off-by: Prasad Ghangal <[email protected]>
  • Loading branch information
PrasadG193 committed Jul 26, 2024
1 parent a11a23b commit 4d2db76
Show file tree
Hide file tree
Showing 16 changed files with 153 additions and 137 deletions.
9 changes: 5 additions & 4 deletions internal/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,14 @@ func remoteRepositoryTest(ctx context.Context, t *testing.T, rep repo.Repository
mustReadObject(ctx, t, w, result, written)

ow := w.NewObjectWriter(ctx, object.WriterOptions{
Prefix: content.ManifestContentPrefix,
Prefix: content.ManifestContentPrefix,
MetadataCompressor: "zstd-fastest",
})

_, err := ow.Write([]byte{2, 3, 4})
require.NoError(t, err)

_, err = ow.Result("zstd-fastest")
_, err = ow.Result()
if err == nil {
return errors.Errorf("unexpected success writing object with 'm' prefix")
}
Expand Down Expand Up @@ -258,12 +259,12 @@ func remoteRepositoryTest(ctx context.Context, t *testing.T, rep repo.Repository
func mustWriteObject(ctx context.Context, t *testing.T, w repo.RepositoryWriter, data []byte) object.ID {
t.Helper()

ow := w.NewObjectWriter(ctx, object.WriterOptions{})
ow := w.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})

_, err := ow.Write(data)
require.NoError(t, err)

result, err := ow.Result("zstd-fastest")
result, err := ow.Result()
require.NoError(t, err)

return result
Expand Down
8 changes: 4 additions & 4 deletions repo/blob/storage_extend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetention(t *testing.T) {
nro.RetentionPeriod = period
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result("zstd-fastest")
w.Result()
w.Close()

env.RepositoryWriter.Flush(ctx)
Expand Down Expand Up @@ -103,9 +103,9 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionUnsupported(t *testing.
nro.RetentionMode = ""
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result("zstd-fastest")
w.Result()
w.Close()

env.RepositoryWriter.Flush(ctx)
Expand Down
4 changes: 2 additions & 2 deletions repo/format/upgrade_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,11 +401,11 @@ func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) {
func writeObject(ctx context.Context, t *testing.T, rep repo.RepositoryWriter, data []byte, testCaseID string) {
t.Helper()

w := rep.NewObjectWriter(ctx, object.WriterOptions{})
w := rep.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})

_, err := w.Write(data)
require.NoError(t, err, testCaseID)

_, err = w.Result("zstd-fastest")
_, err = w.Result()
require.NoError(t, err, testCaseID)
}
4 changes: 2 additions & 2 deletions repo/maintenance/blob_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ func (s *formatSpecificTestSuite) TestDeleteUnreferencedBlobs(t *testing.T) {
nro.BlockFormat.HMACSecret = testHMACSecret
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result("zstd-fastest")
w.Result()
w.Close()

env.RepositoryWriter.Flush(ctx)
Expand Down
8 changes: 4 additions & 4 deletions repo/maintenance/blob_retain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTime(t *testing.T) {
nro.RetentionPeriod = period
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result("zstd-fastest")
w.Result()
w.Close()

env.RepositoryWriter.Flush(ctx)
Expand Down Expand Up @@ -98,9 +98,9 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTimeDisabled(t *testing
nro.BlockFormat.HMACSecret = testHMACSecret
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result("zstd-fastest")
w.Result()
w.Close()

env.RepositoryWriter.Flush(ctx)
Expand Down
8 changes: 4 additions & 4 deletions repo/maintenance/content_rewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,18 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {
// run N sessions to create N individual pack blobs for each content prefix
for range tc.numPContents {
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{})
ow := w.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "%v", uuid.NewString())
_, err := ow.Result("zstd-fastest")
_, err := ow.Result()
return err
}))
}

for range tc.numQContents {
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "k"})
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "k", MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "%v", uuid.NewString())
_, err := ow.Result("zstd-fastest")
_, err := ow.Result()
return err
}))
}
Expand Down
8 changes: 4 additions & 4 deletions repo/maintenance/maintenance_safety_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,18 @@ func (s *formatSpecificTestSuite) TestMaintenanceSafety(t *testing.T) {

// create object that's immediately orphaned since nobody refers to it.
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"})
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y", MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "hello world")
var err error
objectID, err = ow.Result("zstd-fastest")
objectID, err = ow.Result()
return err
}))

// create another object in separate pack.
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"})
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y", MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "hello universe")
_, err := ow.Result("zstd-fastest")
_, err := ow.Result()
return err
}))

Expand Down
12 changes: 7 additions & 5 deletions repo/object/object_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (om *Manager) NewWriter(ctx context.Context, opt WriterOptions) Writer {
w.description = opt.Description
w.prefix = opt.Prefix
w.compressor = compression.ByName[opt.Compressor]
w.metadataCompressor = compression.ByName[opt.MetadataCompressor]
w.totalLength = 0
w.currentPosition = 0

Expand Down Expand Up @@ -106,7 +107,7 @@ func (om *Manager) closedWriter(ow *objectWriter) {
// in parallel utilizing more CPU cores. Because some split points now start at fixed boundaries and not content-specific,
// this causes some slight loss of deduplication at concatenation points (typically 1-2 contents, usually <10MB),
// so this method should only be used for very large files where this overhead is relatively small.
func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID, comp compression.Name) (ID, error) {
func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID, metadataComp compression.Name) (ID, error) {
if len(objectIDs) == 0 {
return EmptyID, errors.Errorf("empty list of objects")
}
Expand All @@ -131,17 +132,18 @@ func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID, comp compres
log(ctx).Debugf("concatenated: %v total: %v", concatenatedEntries, totalLength)

w := om.NewWriter(ctx, WriterOptions{
Prefix: indirectContentPrefix,
Description: "CONCATENATED INDEX",
Compressor: comp,
Prefix: indirectContentPrefix,
Description: "CONCATENATED INDEX",
Compressor: metadataComp,
MetadataCompressor: metadataComp,
})
defer w.Close() //nolint:errcheck

if werr := writeIndirectObject(w, concatenatedEntries); werr != nil {
return EmptyID, werr
}

concatID, err := w.Result(comp)
concatID, err := w.Result()
if err != nil {
return EmptyID, errors.Wrap(err, "error writing concatenated index")
}
Expand Down
52 changes: 28 additions & 24 deletions repo/object/object_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestWriters(t *testing.T) {
t.Errorf("write error: %v", err)
}

result, err := writer.Result("zstd-fastest")
result, err := writer.Result()
if err != nil {
t.Errorf("error getting writer results for %v, expected: %v", c.data, c.objectID.String())
continue
Expand Down Expand Up @@ -175,13 +175,15 @@ func TestCompression_ContentCompressionEnabled(t *testing.T) {
_, _, om := setupTest(t, cmap)

w := om.NewWriter(ctx, WriterOptions{
Compressor: "gzip",
Compressor: "gzip",
MetadataCompressor: "zstd-fastest",
})
w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000))
oid, err := w.Result("zstd-fastest")
oid, err := w.Result()
require.NoError(t, err)

cid, isCompressed, ok := oid.ContentID()

require.True(t, ok)
require.False(t, isCompressed) // oid will not indicate compression
require.Equal(t, compression.ByName["gzip"].HeaderID(), cmap[cid])
Expand Down Expand Up @@ -219,7 +221,7 @@ func TestCompression_CustomSplitters(t *testing.T) {
w := om.NewWriter(ctx, tc.wo)

w.Write(bytes.Repeat([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11}, 128<<10))
oid, err := w.Result("zstd-fastest")
oid, err := w.Result()
require.NoError(t, err)

ndx, ok := oid.IndexObjectID()
Expand All @@ -244,10 +246,11 @@ func TestCompression_ContentCompressionDisabled(t *testing.T) {
_, _, om := setupTest(t, nil)

w := om.NewWriter(ctx, WriterOptions{
Compressor: "gzip",
Compressor: "gzip",
MetadataCompressor: "zstd-fastest",
})
w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000))
oid, err := w.Result("zstd-fastest")
oid, err := w.Result()
require.NoError(t, err)

_, isCompressed, ok := oid.ContentID()
Expand All @@ -263,7 +266,7 @@ func TestWriterCompleteChunkInTwoWrites(t *testing.T) {
writer := om.NewWriter(ctx, WriterOptions{})
writer.Write(b[0:50])
writer.Write(b[0:50])
result, err := writer.Result("zstd-fastest")
result, err := writer.Result()

if !objectIDsEqual(result, mustParseID(t, "cd00e292c5970d3c5e2f0ffa5171e555bc46bfc4faddfb4a418b6840b86e79a3")) {
t.Errorf("unexpected result: %v err: %v", result, err)
Expand All @@ -280,25 +283,25 @@ func TestCheckpointing(t *testing.T) {
allZeroes := make([]byte, 1<<20)

// empty file, nothing flushed
checkpoint1, err := writer.Checkpoint("zstd-fastest")
checkpoint1, err := writer.Checkpoint()
verifyNoError(t, err)

// write some bytes, but not enough to flush.
writer.Write(allZeroes[0:50])
checkpoint2, err := writer.Checkpoint("zstd-fastest")
checkpoint2, err := writer.Checkpoint()
verifyNoError(t, err)

// write enough to flush first content.
writer.Write(allZeroes)
checkpoint3, err := writer.Checkpoint("zstd-fastest")
checkpoint3, err := writer.Checkpoint()
verifyNoError(t, err)

// write enough to flush second content.
writer.Write(allZeroes)
checkpoint4, err := writer.Checkpoint("zstd-fastest")
checkpoint4, err := writer.Checkpoint()
verifyNoError(t, err)

result, err := writer.Result("zstd-fastest")
result, err := writer.Result()
verifyNoError(t, err)

if !objectIDsEqual(checkpoint1, EmptyID) {
Expand All @@ -309,7 +312,7 @@ func TestCheckpointing(t *testing.T) {
t.Errorf("unexpected checkpoint2: %v err: %v", checkpoint2, err)
}

result2, err := writer.Checkpoint("zstd-fastest")
result2, err := writer.Checkpoint()
verifyNoError(t, err)

if result2 != result {
Expand Down Expand Up @@ -354,13 +357,13 @@ func TestObjectWriterRaceBetweenCheckpointAndResult(t *testing.T) {
var eg errgroup.Group

eg.Go(func() error {
_, rerr := w.Result("zstd-fastest")
_, rerr := w.Result()

return rerr
})

eg.Go(func() error {
cpID, cperr := w.Checkpoint("zstd-fastest")
cpID, cperr := w.Checkpoint()
if cperr == nil && cpID != EmptyID {
ids, verr := VerifyObject(ctx, om.contentMgr, cpID)
if verr != nil {
Expand Down Expand Up @@ -467,7 +470,8 @@ func TestIndirection(t *testing.T) {
t.Errorf("write error: %v", err)
}

result, err := writer.Result("zstd-fastest")
// Disable metadata compression here
result, err := writer.Result()
if err != nil {
t.Errorf("error getting writer results: %v", err)
}
Expand Down Expand Up @@ -512,7 +516,7 @@ func TestHMAC(t *testing.T) {

w := om.NewWriter(ctx, WriterOptions{})
w.Write(c)
result, err := w.Result("zstd-fastest")
result, err := w.Result()

if result.String() != "cad29ff89951a3c085c86cb7ed22b82b51f7bdfda24f932c7f9601f51d5975ba" {
t.Errorf("unexpected result: %v err: %v", result.String(), err)
Expand Down Expand Up @@ -645,7 +649,7 @@ func mustWriteObject(t *testing.T, om *Manager, data []byte, compressor compress
_, err := w.Write(data)
require.NoError(t, err)

oid, err := w.Result("zstd-fastest")
oid, err := w.Result()
require.NoError(t, err)

return oid
Expand Down Expand Up @@ -728,7 +732,7 @@ func TestEndToEndReadAndSeek(t *testing.T) {
t.Errorf("write error: %v", err)
}

objectID, err := writer.Result("zstd-fastest")
objectID, err := writer.Result()
t.Logf("oid: %v", objectID)

writer.Close()
Expand Down Expand Up @@ -784,7 +788,7 @@ func TestEndToEndReadAndSeekWithCompression(t *testing.T) {

totalBytesWritten += size

objectID, err := writer.Result("zstd-fastest")
objectID, err := writer.Result()

writer.Close()

Expand Down Expand Up @@ -876,7 +880,7 @@ func TestSeek(t *testing.T) {
t.Errorf("write error: %v", err)
}

objectID, err := writer.Result("zstd-fastest")
objectID, err := writer.Result()
if err != nil {
t.Fatalf("unable to write: %v", err)
}
Expand Down Expand Up @@ -938,7 +942,7 @@ func TestWriterFlushFailure_OnFlush(t *testing.T) {

fcm.writeContentError = errSomeError

_, err = w.Result("zstd-fastest")
_, err = w.Result()
require.ErrorIs(t, err, errSomeError)
}

Expand All @@ -951,7 +955,7 @@ func TestWriterFlushFailure_OnCheckpoint(t *testing.T) {
w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1e6))

fcm.writeContentError = errSomeError
_, err := w.Checkpoint("zstd-fastest")
_, err := w.Checkpoint()

require.ErrorIs(t, err, errSomeError)
}
Expand All @@ -970,7 +974,7 @@ func TestWriterFlushFailure_OnAsyncWrite(t *testing.T) {
require.NotErrorIs(t, err, errSomeError)
require.Equal(t, 4000000, n)

_, err = w.Result("zstd-fastest")
_, err = w.Result()
require.ErrorIs(t, err, errSomeError)
}

Expand Down
Loading

0 comments on commit 4d2db76

Please sign in to comment.