Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(repository): Metadata compression config support for indirect content #557

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions internal/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ func remoteRepositoryTest(ctx context.Context, t *testing.T, rep repo.Repository
mustReadObject(ctx, t, w, result, written)

ow := w.NewObjectWriter(ctx, object.WriterOptions{
Prefix: content.ManifestContentPrefix,
Prefix: content.ManifestContentPrefix,
MetadataCompressor: "zstd-fastest",
})

_, err := ow.Write([]byte{2, 3, 4})
Expand Down Expand Up @@ -258,7 +259,7 @@ func remoteRepositoryTest(ctx context.Context, t *testing.T, rep repo.Repository
func mustWriteObject(ctx context.Context, t *testing.T, w repo.RepositoryWriter, data []byte) object.ID {
t.Helper()

ow := w.NewObjectWriter(ctx, object.WriterOptions{})
ow := w.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})

_, err := ow.Write(data)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions repo/blob/storage_extend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetention(t *testing.T) {
nro.RetentionPeriod = period
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
Expand Down Expand Up @@ -103,7 +103,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionUnsupported(t *testing.
nro.RetentionMode = ""
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
Expand Down
29 changes: 29 additions & 0 deletions repo/content/content_manager_test.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion repo/format/upgrade_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) {
func writeObject(ctx context.Context, t *testing.T, rep repo.RepositoryWriter, data []byte, testCaseID string) {
t.Helper()

w := rep.NewObjectWriter(ctx, object.WriterOptions{})
w := rep.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})

_, err := w.Write(data)
require.NoError(t, err, testCaseID)
Expand Down
4 changes: 2 additions & 2 deletions repo/grpc_repository_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,9 @@ func (r *grpcRepositoryClient) NewWriter(ctx context.Context, opt WriteSessionOp
}

// ConcatenateObjects creates a concatenated objects from the provided object IDs.
func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) {
func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID, comp compression.Name) (object.ID, error) {
//nolint:wrapcheck
return r.omgr.Concatenate(ctx, objectIDs)
return r.omgr.Concatenate(ctx, objectIDs, comp)
}

// maybeRetry executes the provided callback with or without automatic retries depending on how
Expand Down
2 changes: 1 addition & 1 deletion repo/maintenance/blob_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *formatSpecificTestSuite) TestDeleteUnreferencedBlobs(t *testing.T) {
nro.BlockFormat.HMACSecret = testHMACSecret
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
Expand Down
4 changes: 2 additions & 2 deletions repo/maintenance/blob_retain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTime(t *testing.T) {
nro.RetentionPeriod = period
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
Expand Down Expand Up @@ -98,7 +98,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTimeDisabled(t *testing
nro.BlockFormat.HMACSecret = testHMACSecret
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
Expand Down
4 changes: 2 additions & 2 deletions repo/maintenance/content_rewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {
// run N sessions to create N individual pack blobs for each content prefix
for range tc.numPContents {
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{})
ow := w.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "%v", uuid.NewString())
_, err := ow.Result()
return err
Expand All @@ -88,7 +88,7 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {

for range tc.numQContents {
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "k"})
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "k", MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "%v", uuid.NewString())
_, err := ow.Result()
return err
Expand Down
4 changes: 2 additions & 2 deletions repo/maintenance/maintenance_safety_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (s *formatSpecificTestSuite) TestMaintenanceSafety(t *testing.T) {

// create object that's immediately orphaned since nobody refers to it.
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"})
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y", MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "hello world")
var err error
objectID, err = ow.Result()
Expand All @@ -43,7 +43,7 @@ func (s *formatSpecificTestSuite) TestMaintenanceSafety(t *testing.T) {

// create another object in separate pack.
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"})
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y", MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "hello universe")
_, err := ow.Result()
return err
Expand Down
9 changes: 6 additions & 3 deletions repo/object/object_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (om *Manager) NewWriter(ctx context.Context, opt WriterOptions) Writer {
w.description = opt.Description
w.prefix = opt.Prefix
w.compressor = compression.ByName[opt.Compressor]
w.metadataCompressor = compression.ByName[opt.MetadataCompressor]
w.totalLength = 0
w.currentPosition = 0

Expand Down Expand Up @@ -106,7 +107,7 @@ func (om *Manager) closedWriter(ow *objectWriter) {
// in parallel utilizing more CPU cores. Because some split points now start at fixed boundaries and not content-specific,
// this causes some slight loss of deduplication at concatenation points (typically 1-2 contents, usually <10MB),
// so this method should only be used for very large files where this overhead is relatively small.
func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error) {
func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID, metadataComp compression.Name) (ID, error) {
if len(objectIDs) == 0 {
return EmptyID, errors.Errorf("empty list of objects")
}
Expand All @@ -131,8 +132,10 @@ func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error)
log(ctx).Debugf("concatenated: %v total: %v", concatenatedEntries, totalLength)

w := om.NewWriter(ctx, WriterOptions{
Prefix: indirectContentPrefix,
Description: "CONCATENATED INDEX",
Prefix: indirectContentPrefix,
Description: "CONCATENATED INDEX",
Compressor: metadataComp,
MetadataCompressor: metadataComp,
})
defer w.Close() //nolint:errcheck

Expand Down
59 changes: 49 additions & 10 deletions repo/object/object_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (f *fakeContentManager) ContentInfo(ctx context.Context, contentID content.
defer f.mu.Unlock()

if d, ok := f.data[contentID]; ok {
return content.Info{ContentID: contentID, PackedLength: uint32(len(d))}, nil
return content.Info{ContentID: contentID, PackedLength: uint32(len(d)), CompressionHeaderID: f.compresionIDs[contentID]}, nil
}

return content.Info{}, blob.ErrBlobNotFound
Expand Down Expand Up @@ -175,18 +175,43 @@ func TestCompression_ContentCompressionEnabled(t *testing.T) {
_, _, om := setupTest(t, cmap)

w := om.NewWriter(ctx, WriterOptions{
Compressor: "gzip",
Compressor: "gzip",
MetadataCompressor: "zstd-fastest",
})
w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000))
oid, err := w.Result()
require.NoError(t, err)

cid, isCompressed, ok := oid.ContentID()

require.True(t, ok)
require.False(t, isCompressed) // oid will not indicate compression
require.Equal(t, compression.ByName["gzip"].HeaderID(), cmap[cid])
}

func TestCompression_IndirectContentCompressionEnabledMetadata(t *testing.T) {
ctx := testlogging.Context(t)

cmap := map[content.ID]compression.HeaderID{}
_, _, om := setupTest(t, cmap)
w := om.NewWriter(ctx, WriterOptions{
Compressor: "gzip",
MetadataCompressor: "zstd-fastest",
})
w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000000))
oid, err := w.Result()
require.NoError(t, err)
verifyIndirectBlock(ctx, t, om, oid, compression.HeaderZstdFastest)

w2 := om.NewWriter(ctx, WriterOptions{
MetadataCompressor: "none",
})
w2.Write(bytes.Repeat([]byte{5, 6, 7, 8}, 1000000))
oid2, err2 := w2.Result()
require.NoError(t, err2)
verifyIndirectBlock(ctx, t, om, oid2, content.NoCompression)
}

func TestCompression_CustomSplitters(t *testing.T) {
cases := []struct {
wo WriterOptions
Expand Down Expand Up @@ -244,7 +269,8 @@ func TestCompression_ContentCompressionDisabled(t *testing.T) {
_, _, om := setupTest(t, nil)

w := om.NewWriter(ctx, WriterOptions{
Compressor: "gzip",
Compressor: "gzip",
MetadataCompressor: "zstd-fastest",
})
w.Write(bytes.Repeat([]byte{1, 2, 3, 4}, 1000))
oid, err := w.Result()
Expand Down Expand Up @@ -409,7 +435,7 @@ func verifyNoError(t *testing.T, err error) {
require.NoError(t, err)
}

func verifyIndirectBlock(ctx context.Context, t *testing.T, om *Manager, oid ID) {
func verifyIndirectBlock(ctx context.Context, t *testing.T, om *Manager, oid ID, expectedComp compression.HeaderID) {
t.Helper()

for indexContentID, isIndirect := oid.IndexObjectID(); isIndirect; indexContentID, isIndirect = indexContentID.IndexObjectID() {
Expand All @@ -418,6 +444,11 @@ func verifyIndirectBlock(ctx context.Context, t *testing.T, om *Manager, oid ID)
if !c.HasPrefix() {
t.Errorf("expected base content ID to be prefixed, was %v", c)
}
info, err := om.contentMgr.ContentInfo(ctx, c)
if err != nil {
t.Errorf("error getting content info for %v", err.Error())
}
require.Equal(t, expectedComp, info.CompressionHeaderID)
}

rd, err := Open(ctx, om.contentMgr, indexContentID)
Expand All @@ -443,6 +474,7 @@ func TestIndirection(t *testing.T) {
dataLength int
expectedBlobCount int
expectedIndirection int
metadataCompressor compression.Name
}{
{dataLength: 200, expectedBlobCount: 1, expectedIndirection: 0},
{dataLength: 1000, expectedBlobCount: 1, expectedIndirection: 0},
Expand All @@ -452,15 +484,18 @@ func TestIndirection(t *testing.T) {
// 1 blob of 1000 zeros + 1 index blob
{dataLength: 4000, expectedBlobCount: 2, expectedIndirection: 1},
// 1 blob of 1000 zeros + 1 index blob
{dataLength: 10000, expectedBlobCount: 2, expectedIndirection: 1},
{dataLength: 10000, expectedBlobCount: 2, expectedIndirection: 1, metadataCompressor: "none"},
// 1 blob of 1000 zeros + 1 index blob, enabled metadata compression
{dataLength: 10000, expectedBlobCount: 2, expectedIndirection: 1, metadataCompressor: "zstd-fastest"},
}

for _, c := range cases {
data, _, om := setupTest(t, nil)
cmap := map[content.ID]compression.HeaderID{}
data, _, om := setupTest(t, cmap)

contentBytes := make([]byte, c.dataLength)

writer := om.NewWriter(ctx, WriterOptions{})
writer := om.NewWriter(ctx, WriterOptions{MetadataCompressor: c.metadataCompressor})
writer.(*objectWriter).splitter = splitterFactory()

if _, err := writer.Write(contentBytes); err != nil {
Expand Down Expand Up @@ -491,7 +526,11 @@ func TestIndirection(t *testing.T) {
t.Errorf("invalid blob count for %v, got %v, wanted %v", result, got, want)
}

verifyIndirectBlock(ctx, t, om, result)
expectedCompressor := content.NoCompression
if len(c.metadataCompressor) > 0 && c.metadataCompressor != "none" {
expectedCompressor = compression.ByName[c.metadataCompressor].HeaderID()
}
verifyIndirectBlock(ctx, t, om, result, expectedCompressor)
}
}

Expand Down Expand Up @@ -578,7 +617,7 @@ func TestConcatenate(t *testing.T) {
}

for _, tc := range cases {
concatenatedOID, err := om.Concatenate(ctx, tc.inputs)
concatenatedOID, err := om.Concatenate(ctx, tc.inputs, "zstd-fastest")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -617,7 +656,7 @@ func TestConcatenate(t *testing.T) {
}

// make sure results of concatenation can be further concatenated.
concatenated3OID, err := om.Concatenate(ctx, []ID{concatenatedOID, concatenatedOID, concatenatedOID})
concatenated3OID, err := om.Concatenate(ctx, []ID{concatenatedOID, concatenatedOID, concatenatedOID}, "zstd-fastest")
if err != nil {
t.Fatal(err)
}
Expand Down
27 changes: 15 additions & 12 deletions repo/object/object_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ type objectWriter struct {

om *Manager

compressor compression.Compressor
compressor compression.Compressor
metadataCompressor compression.Compressor

prefix content.IDPrefix
buffer gather.WriteBuffer
Expand Down Expand Up @@ -292,12 +293,13 @@ func (w *objectWriter) checkpointLocked() (ID, error) {
}

iw := &objectWriter{
ctx: w.ctx,
om: w.om,
compressor: nil,
description: "LIST(" + w.description + ")",
splitter: w.om.newDefaultSplitter(),
prefix: w.prefix,
ctx: w.ctx,
om: w.om,
compressor: w.metadataCompressor,
metadataCompressor: w.metadataCompressor,
description: "LIST(" + w.description + ")",
splitter: w.om.newDefaultSplitter(),
prefix: w.prefix,
}

if iw.prefix == "" {
Expand Down Expand Up @@ -334,9 +336,10 @@ func writeIndirectObject(w io.Writer, entries []IndirectObjectEntry) error {

// WriterOptions can be passed to Repository.NewWriter().
type WriterOptions struct {
Description string
Prefix content.IDPrefix // empty string or a single-character ('g'..'z')
Compressor compression.Name
Splitter string // use particular splitter instead of default
AsyncWrites int // allow up to N content writes to be asynchronous
Description string
Prefix content.IDPrefix // empty string or a single-character ('g'..'z')
Compressor compression.Name
MetadataCompressor compression.Name
Splitter string // use particular splitter instead of default
AsyncWrites int // allow up to N content writes to be asynchronous
}
6 changes: 3 additions & 3 deletions repo/repo_benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func BenchmarkWriterDedup1M(b *testing.B) {
ctx, env := repotesting.NewEnvironment(b, format.FormatVersion2)
dataBuf := make([]byte, 4<<20)

writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
writer.Write(dataBuf)
_, err := writer.Result()
require.NoError(b, err)
Expand All @@ -25,7 +25,7 @@ func BenchmarkWriterDedup1M(b *testing.B) {

for range b.N {
// write exactly the same data
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
writer.Write(dataBuf)
writer.Result()
writer.Close()
Expand All @@ -45,7 +45,7 @@ func BenchmarkWriterNoDedup1M(b *testing.B) {

for i := range b.N {
// write exactly the same data
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
writer := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})

if i+chunkSize > len(dataBuf) {
chunkSize++
Expand Down
7 changes: 4 additions & 3 deletions repo/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/kopia/kopia/internal/metrics"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/throttling"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/content/indexblob"
"github.com/kopia/kopia/repo/format"
Expand Down Expand Up @@ -47,7 +48,7 @@ type RepositoryWriter interface {
Repository

NewObjectWriter(ctx context.Context, opt object.WriterOptions) object.Writer
ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error)
ConcatenateObjects(ctx context.Context, objectIDs []object.ID, comp compression.Name) (object.ID, error)
PutManifest(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error)
ReplaceManifests(ctx context.Context, labels map[string]string, payload interface{}) (manifest.ID, error)
DeleteManifest(ctx context.Context, id manifest.ID) error
Expand Down Expand Up @@ -180,9 +181,9 @@ func (r *directRepository) NewObjectWriter(ctx context.Context, opt object.Write
}

// ConcatenateObjects creates a concatenated objects from the provided object IDs.
func (r *directRepository) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) {
func (r *directRepository) ConcatenateObjects(ctx context.Context, objectIDs []object.ID, comp compression.Name) (object.ID, error) {
//nolint:wrapcheck
return r.omgr.Concatenate(ctx, objectIDs)
return r.omgr.Concatenate(ctx, objectIDs, comp)
}

// DisableIndexRefresh disables index refresh for the duration of the write session.
Expand Down
Loading
Loading