diff --git a/repo/object/object_manager_test.go b/repo/object/object_manager_test.go index 4397fd1a59c..6602cb830e1 100644 --- a/repo/object/object_manager_test.go +++ b/repo/object/object_manager_test.go @@ -88,7 +88,7 @@ func (f *fakeContentManager) ContentInfo(ctx context.Context, contentID content. defer f.mu.Unlock() if d, ok := f.data[contentID]; ok { - return content.Info{ContentID: contentID, PackedLength: uint32(len(d))}, nil + return content.Info{ContentID: contentID, PackedLength: uint32(len(d)), CompressionHeaderID: f.compresionIDs[contentID]}, nil } return content.Info{}, blob.ErrBlobNotFound @@ -189,6 +189,29 @@ func TestCompression_ContentCompressionEnabled(t *testing.T) { require.Equal(t, compression.ByName["gzip"].HeaderID(), cmap[cid]) } +func TestCompression_IndirectContentCompressionEnabledMetadata(t *testing.T) { + ctx := testlogging.Context(t) + + cmap := map[content.ID]compression.HeaderID{} + _, _, om := setupTest(t, cmap) + w := om.NewWriter(ctx, WriterOptions{ + Compressor: "gzip", + MetadataCompressor: "zstd-fastest", + }) + w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000000)) + oid, err := w.Result() + require.NoError(t, err) + verifyIndirectBlock(ctx, t, om, oid, compression.HeaderZstdFastest) + + w2 := om.NewWriter(ctx, WriterOptions{ + MetadataCompressor: "none", + }) + w2.Write(bytes.Repeat([]byte{5, 6, 7, 8}, 1000000)) + oid2, err2 := w2.Result() + require.NoError(t, err2) + verifyIndirectBlock(ctx, t, om, oid2, content.NoCompression) +} + func TestCompression_CustomSplitters(t *testing.T) { cases := []struct { wo WriterOptions @@ -412,7 +435,7 @@ func verifyNoError(t *testing.T, err error) { require.NoError(t, err) } -func verifyIndirectBlock(ctx context.Context, t *testing.T, om *Manager, oid ID) { +func verifyIndirectBlock(ctx context.Context, t *testing.T, om *Manager, oid ID, expectedComp compression.HeaderID) { t.Helper() for indexContentID, isIndirect := oid.IndexObjectID(); isIndirect; indexContentID, isIndirect = indexContentID.IndexObjectID() { @@ -421,6 +444,11 @@ func verifyIndirectBlock(ctx context.Context, t *testing.T, om *Manager, oid ID) if !c.HasPrefix() { t.Errorf("expected base content ID to be prefixed, was %v", c) } + info, err := om.contentMgr.ContentInfo(ctx, c) + if err != nil { + t.Errorf("error getting content info for %v", err.Error()) + } + require.Equal(t, expectedComp, info.CompressionHeaderID) } rd, err := Open(ctx, om.contentMgr, indexContentID) @@ -459,11 +487,12 @@ func TestIndirection(t *testing.T) { } for _, c := range cases { - data, _, om := setupTest(t, nil) + cmap := map[content.ID]compression.HeaderID{} + data, _, om := setupTest(t, cmap) contentBytes := make([]byte, c.dataLength) - writer := om.NewWriter(ctx, WriterOptions{}) + writer := om.NewWriter(ctx, WriterOptions{MetadataCompressor: "zstd-fastest", Compressor: "gzip"}) writer.(*objectWriter).splitter = splitterFactory() if _, err := writer.Write(contentBytes); err != nil { @@ -494,7 +523,7 @@ func TestIndirection(t *testing.T) { t.Errorf("invalid blob count for %v, got %v, wanted %v", result, got, want) } - verifyIndirectBlock(ctx, t, om, result) + verifyIndirectBlock(ctx, t, om, result, content.NoCompression) } } diff --git a/snapshot/snapshotfs/upload_test.go b/snapshot/snapshotfs/upload_test.go index 934c0a9b4f5..b82325b8e02 100644 --- a/snapshot/snapshotfs/upload_test.go +++ b/snapshot/snapshotfs/upload_test.go @@ -38,6 +38,8 @@ import ( "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/blob/filesystem" bloblogging "github.com/kopia/kopia/repo/blob/logging" + "github.com/kopia/kopia/repo/compression" + "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/logging" "github.com/kopia/kopia/repo/object" "github.com/kopia/kopia/snapshot" @@ -228,6 +230,106 @@ func TestUpload(t *testing.T) { } } +type entry struct { + name string + objectID object.ID +} + +// findAllEntries recursively iterates over all the dirs and returns list of file entries +func findAllEntries(t *testing.T, ctx context.Context, dir fs.Directory) []entry { + entries := []entry{} + fs.IterateEntries(ctx, dir, func(ctx context.Context, e fs.Entry) error { + oid, err := object.ParseID(e.(object.HasObjectID).ObjectID().String()) + require.NoError(t, err) + entries = append(entries, entry{ + name: e.Name(), + objectID: oid, + }) + if e.IsDir() { + entries = append(entries, findAllEntries(t, ctx, e.(fs.Directory))...) + } + return nil + }) + return entries +} + +func verifyMetadataCompressor(t *testing.T, ctx context.Context, rep repo.Repository, entries []entry, comp compression.HeaderID) { + for _, e := range entries { + cid, _, ok := e.objectID.ContentID() + require.True(t, ok) + if !cid.HasPrefix() { + continue + } + info, err := rep.ContentInfo(ctx, cid) + if err != nil { + t.Errorf("failed to get content info: %v", err) + } + require.Equal(t, comp, info.CompressionHeaderID) + } +} + +func TestUploadMetadataCompression(t *testing.T) { + ctx := testlogging.Context(t) + t.Run("default metadata compression", func(t *testing.T) { + th := newUploadTestHarness(ctx, t) + defer th.cleanup() + u := NewUploader(th.repo) + policyTree := policy.BuildTree(nil, policy.DefaultPolicy) + + s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{}) + if err != nil { + t.Errorf("Upload error: %v", err) + } + + dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory) + entries := findAllEntries(t, ctx, dir) + verifyMetadataCompressor(t, ctx, th.repo, entries, compression.HeaderZstdFastest) + }) + t.Run("disable metadata compression", func(t *testing.T) { + th := newUploadTestHarness(ctx, t) + defer th.cleanup() + u := NewUploader(th.repo) + // policyTree := policy.BuildTree(nil, policy.DefaultPolicy) + policyTree := policy.BuildTree(map[string]*policy.Policy{ + ".": { + MetadataCompressionPolicy: policy.MetadataCompressionPolicy{ + CompressorName: "none", + }, + }, + }, policy.DefaultPolicy) + + s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{}) + if err != nil { + t.Errorf("Upload error: %v", err) + } + + dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory) + entries := findAllEntries(t, ctx, dir) + verifyMetadataCompressor(t, ctx, th.repo, entries, content.NoCompression) + }) + t.Run("set metadata compressor", func(t *testing.T) { + th := newUploadTestHarness(ctx, t) + defer th.cleanup() + u := NewUploader(th.repo) + policyTree := policy.BuildTree(map[string]*policy.Policy{ + ".": { + MetadataCompressionPolicy: policy.MetadataCompressionPolicy{ + CompressorName: "gzip", + }, + }, + }, policy.DefaultPolicy) + + s1, err := u.Upload(ctx, th.sourceDir, policyTree, snapshot.SourceInfo{}) + if err != nil { + t.Errorf("Upload error: %v", err) + } + + dir := EntryFromDirEntry(th.repo, s1.RootEntry).(fs.Directory) + entries := findAllEntries(t, ctx, dir) + verifyMetadataCompressor(t, ctx, th.repo, entries, compression.ByName["gzip"].HeaderID()) + }) +} + func TestUpload_TopLevelDirectoryReadFailure(t *testing.T) { ctx := testlogging.Context(t) th := newUploadTestHarness(ctx, t)