Skip to content

Commit

Permalink
Configure compressor for k and x prefixed content
Browse files Browse the repository at this point in the history
Adds metadata compression setting to policy
Add support to configure compressor for k and x prefixed content
Set zstd-fastest as the default compressor for metadata in the policy
Adds support to set and show metadata compression to kopia policy commands
Adds metadata compression config to dir writer

Signed-off-by: Prasad Ghangal <[email protected]>
  • Loading branch information
PrasadG193 committed Aug 27, 2024
1 parent d37de83 commit dff0752
Show file tree
Hide file tree
Showing 34 changed files with 474 additions and 148 deletions.
6 changes: 6 additions & 0 deletions cli/command_policy_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type commandPolicySet struct {

policyActionFlags
policyCompressionFlags
policyMetadataCompressionFlags
policySplitterFlags
policyErrorFlags
policyFilesFlags
Expand All @@ -36,6 +37,7 @@ func (c *commandPolicySet) setup(svc appServices, parent commandParent) {

c.policyActionFlags.setup(cmd)
c.policyCompressionFlags.setup(cmd)
c.policyMetadataCompressionFlags.setup(cmd)
c.policySplitterFlags.setup(cmd)
c.policyErrorFlags.setup(cmd)
c.policyFilesFlags.setup(cmd)
Expand Down Expand Up @@ -108,6 +110,10 @@ func (c *commandPolicySet) setPolicyFromFlags(ctx context.Context, p *policy.Pol
return errors.Wrap(err, "compression policy")
}

if err := c.setMetadataCompressionPolicyFromFlags(ctx, &p.MetadataCompressionPolicy, changeCount); err != nil {
return errors.Wrap(err, "metadata compression policy")
}

if err := c.setSplitterPolicyFromFlags(ctx, &p.SplitterPolicy, changeCount); err != nil {
return errors.Wrap(err, "splitter policy")
}
Expand Down
31 changes: 31 additions & 0 deletions cli/command_policy_set_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,37 @@ type policyCompressionFlags struct {
policySetClearNeverCompress bool
}

type policyMetadataCompressionFlags struct {
policySetMetadataCompressionAlgorithm string
}

func (c *policyMetadataCompressionFlags) setup(cmd *kingpin.CmdClause) {
// Name of compression algorithm.
cmd.Flag("metadata-compression", "Metadata Compression algorithm").EnumVar(&c.policySetMetadataCompressionAlgorithm, supportedCompressionAlgorithms()...)
}

func (c *policyMetadataCompressionFlags) setMetadataCompressionPolicyFromFlags(
ctx context.Context,
p *policy.MetadataCompressionPolicy,
changeCount *int,
) error { //nolint:unparam
if v := c.policySetMetadataCompressionAlgorithm; v != "" {
*changeCount++

if v == inheritPolicyString {
log(ctx).Info(" - resetting compression algorithm to default value inherited from parent")

p.CompressorName = ""
} else {
log(ctx).Infof(" - setting compression algorithm to %v", v)

p.CompressorName = compression.Name(v)
}
}

return nil
}

func (c *policyCompressionFlags) setup(cmd *kingpin.CmdClause) {
// Name of compression algorithm.
cmd.Flag("compression", "Compression algorithm").EnumVar(&c.policySetCompressionAlgorithm, supportedCompressionAlgorithms()...)
Expand Down
13 changes: 13 additions & 0 deletions cli/command_policy_show.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func printPolicy(out *textOutput, p *policy.Policy, def *policy.Definition) {
rows = append(rows, policyTableRow{})
rows = appendCompressionPolicyRows(rows, p, def)
rows = append(rows, policyTableRow{})
rows = appendMetadataCompressionPolicyRows(rows, p, def)
rows = append(rows, policyTableRow{})
rows = appendSplitterPolicyRows(rows, p, def)
rows = append(rows, policyTableRow{})
rows = appendActionsPolicyRows(rows, p, def)
Expand Down Expand Up @@ -388,6 +390,17 @@ func appendCompressionPolicyRows(rows []policyTableRow, p *policy.Policy, def *p
return rows
}

func appendMetadataCompressionPolicyRows(rows []policyTableRow, p *policy.Policy, def *policy.Definition) []policyTableRow {
if p.MetadataCompressionPolicy.CompressorName == "" || p.MetadataCompressionPolicy.CompressorName == "none" {
rows = append(rows, policyTableRow{"Metadata compression disabled.", "", ""})
return rows
}

return append(rows,
policyTableRow{"Metadata compression:", "", ""},
policyTableRow{" Compressor:", string(p.MetadataCompressionPolicy.CompressorName), definitionPointToString(p.Target(), def.MetadataCompressionPolicy.CompressorName)})
}

func appendSplitterPolicyRows(rows []policyTableRow, p *policy.Policy, def *policy.Definition) []policyTableRow {
algorithm := p.SplitterPolicy.Algorithm
if algorithm == "" {
Expand Down
10 changes: 9 additions & 1 deletion cli/command_snapshot_fix.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/manifest"
"github.com/kopia/kopia/snapshot"
"github.com/kopia/kopia/snapshot/policy"
"github.com/kopia/kopia/snapshot/snapshotfs"
)

Expand Down Expand Up @@ -90,12 +91,19 @@ func (c *commonRewriteSnapshots) rewriteMatchingSnapshots(ctx context.Context, r
for _, mg := range snapshot.GroupBySource(manifests) {
log(ctx).Infof("Processing snapshot %v", mg[0].Source)

policyTree, err := policy.TreeForSource(ctx, rep, mg[0].Source)
if err != nil {
return errors.Wrap(err, "unable to get policy tree")
}

metadataComp := policyTree.EffectivePolicy().MetadataCompressionPolicy.MetadataCompressor()

for _, man := range snapshot.SortByTime(mg, false) {
log(ctx).Debugf(" %v (%v)", formatTimestamp(man.StartTime.ToTime()), man.ID)

old := man.Clone()

changed, err := rw.RewriteSnapshotManifest(ctx, man)
changed, err := rw.RewriteSnapshotManifest(ctx, man, metadataComp)
if err != nil {
return errors.Wrap(err, "error rewriting manifest")
}
Expand Down
2 changes: 1 addition & 1 deletion internal/server/grpc_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func handleWriteContentRequest(ctx context.Context, dw repo.DirectRepositoryWrit
return accessDeniedResponse()
}

if strings.HasPrefix(req.GetPrefix(), manifest.ContentPrefix) {
if strings.HasPrefix(req.GetPrefix(), content.ManifestContentPrefix) {
// it's not allowed to create contents prefixed with 'm' since those could be mistaken for manifest contents.
return accessDeniedResponse()
}
Expand Down
5 changes: 3 additions & 2 deletions internal/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ func remoteRepositoryTest(ctx context.Context, t *testing.T, rep repo.Repository
mustReadObject(ctx, t, w, result, written)

ow := w.NewObjectWriter(ctx, object.WriterOptions{
Prefix: manifest.ContentPrefix,
Prefix: content.ManifestContentPrefix,
MetadataCompressor: "zstd-fastest",
})

_, err := ow.Write([]byte{2, 3, 4})
Expand Down Expand Up @@ -258,7 +259,7 @@ func remoteRepositoryTest(ctx context.Context, t *testing.T, rep repo.Repository
func mustWriteObject(ctx context.Context, t *testing.T, w repo.RepositoryWriter, data []byte) object.ID {
t.Helper()

ow := w.NewObjectWriter(ctx, object.WriterOptions{})
ow := w.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})

_, err := ow.Write(data)
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions repo/blob/storage_extend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetention(t *testing.T) {
nro.RetentionPeriod = period
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
Expand Down Expand Up @@ -103,7 +103,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionUnsupported(t *testing.
nro.RetentionMode = ""
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
Expand Down
5 changes: 5 additions & 0 deletions repo/content/content_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const (
DefaultIndexVersion = 2
)

// ManifestContentPrefix is the prefix of the content id for manifests.
const (
ManifestContentPrefix = "m"
)

var tracer = otel.Tracer("kopia/content")

// PackBlobIDPrefixes contains all possible prefixes for pack blobs.
Expand Down
4 changes: 3 additions & 1 deletion repo/content/content_manager_lock_free.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ func (sm *SharedManager) maybeCompressAndEncryptDataForPacking(data gather.Bytes

// If the content is prefixed (which represents Kopia's own metadata as opposed to user data),
// and we're on V2 format or greater, enable internal compression even when not requested.
if contentID.HasPrefix() && comp == NoCompression && mp.IndexVersion >= index.Version2 {
// Set compression only for manifest metadata.
// TODO: Remove this check once metadata compression setting is implemented for manifest metadata.
if contentID.HasPrefix() && contentID.Prefix() == ManifestContentPrefix && comp == NoCompression && mp.IndexVersion >= index.Version2 {
// 'zstd-fastest' has a good mix of being fast, low memory usage and high compression for JSON.
comp = compression.HeaderZstdFastest
}
Expand Down
57 changes: 55 additions & 2 deletions repo/content/content_manager_test.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion repo/format/upgrade_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) {
func writeObject(ctx context.Context, t *testing.T, rep repo.RepositoryWriter, data []byte, testCaseID string) {
t.Helper()

w := rep.NewObjectWriter(ctx, object.WriterOptions{})
w := rep.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})

_, err := w.Write(data)
require.NoError(t, err, testCaseID)
Expand Down
6 changes: 3 additions & 3 deletions repo/grpc_repository_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,9 @@ func (r *grpcRepositoryClient) NewWriter(ctx context.Context, opt WriteSessionOp
}

// ConcatenateObjects creates a concatenated objects from the provided object IDs.
func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID) (object.ID, error) {
func (r *grpcRepositoryClient) ConcatenateObjects(ctx context.Context, objectIDs []object.ID, comp compression.Name) (object.ID, error) {
//nolint:wrapcheck
return r.omgr.Concatenate(ctx, objectIDs)
return r.omgr.Concatenate(ctx, objectIDs, comp)
}

// maybeRetry executes the provided callback with or without automatic retries depending on how
Expand Down Expand Up @@ -721,7 +721,7 @@ func (r *grpcRepositoryClient) WriteContent(ctx context.Context, data gather.Byt
}

// we will be writing asynchronously and server will reject this write, fail early.
if prefix == manifest.ContentPrefix {
if prefix == content.ManifestContentPrefix {
return content.EmptyID, errors.Errorf("writing manifest contents not allowed")
}

Expand Down
2 changes: 1 addition & 1 deletion repo/maintenance/blob_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (s *formatSpecificTestSuite) TestDeleteUnreferencedBlobs(t *testing.T) {
nro.BlockFormat.HMACSecret = testHMACSecret
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
Expand Down
4 changes: 2 additions & 2 deletions repo/maintenance/blob_retain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTime(t *testing.T) {
nro.RetentionPeriod = period
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
Expand Down Expand Up @@ -98,7 +98,7 @@ func (s *formatSpecificTestSuite) TestExtendBlobRetentionTimeDisabled(t *testing
nro.BlockFormat.HMACSecret = testHMACSecret
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
Expand Down
4 changes: 2 additions & 2 deletions repo/maintenance/content_rewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {
// run N sessions to create N individual pack blobs for each content prefix
for range tc.numPContents {
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{})
ow := w.NewObjectWriter(ctx, object.WriterOptions{MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "%v", uuid.NewString())
_, err := ow.Result()
return err
Expand All @@ -88,7 +88,7 @@ func (s *formatSpecificTestSuite) TestContentRewrite(t *testing.T) {

for range tc.numQContents {
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "k"})
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "k", MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "%v", uuid.NewString())
_, err := ow.Result()
return err
Expand Down
4 changes: 2 additions & 2 deletions repo/maintenance/maintenance_safety_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (s *formatSpecificTestSuite) TestMaintenanceSafety(t *testing.T) {

// create object that's immediately orphaned since nobody refers to it.
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"})
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y", MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "hello world")
var err error
objectID, err = ow.Result()
Expand All @@ -43,7 +43,7 @@ func (s *formatSpecificTestSuite) TestMaintenanceSafety(t *testing.T) {

// create another object in separate pack.
require.NoError(t, repo.WriteSession(ctx, env.Repository, repo.WriteSessionOptions{}, func(ctx context.Context, w repo.RepositoryWriter) error {
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y"})
ow := w.NewObjectWriter(ctx, object.WriterOptions{Prefix: "y", MetadataCompressor: "zstd-fastest"})
fmt.Fprintf(ow, "hello universe")
_, err := ow.Result()
return err
Expand Down
4 changes: 2 additions & 2 deletions repo/manifest/committed_manifest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (m *committedManifestManager) writeEntriesLocked(ctx context.Context, entri
mustSucceed(gz.Flush())
mustSucceed(gz.Close())

contentID, err := m.b.WriteContent(ctx, buf.Bytes(), ContentPrefix, content.NoCompression)
contentID, err := m.b.WriteContent(ctx, buf.Bytes(), content.ManifestContentPrefix, content.NoCompression)
if err != nil {
return nil, errors.Wrap(err, "unable to write content")
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func (m *committedManifestManager) loadCommittedContentsLocked(ctx context.Conte
manifests = map[content.ID]manifest{}

err := m.b.IterateContents(ctx, content.IterateOptions{
Range: index.PrefixRange(ContentPrefix),
Range: index.PrefixRange(content.ManifestContentPrefix),
Parallel: manifestLoadParallelism,
}, func(ci content.Info) error {
man, err := loadManifestContent(ctx, m.b, ci.ContentID)
Expand Down
8 changes: 2 additions & 6 deletions repo/manifest/manifest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,15 @@ import (
const (
manifestLoadParallelism = 8
manifestIDLength = 16

autoCompactionContentCountDefault = 16
)

var log = logging.Module("kopia/manifest") // +checklocksignore

// ErrNotFound is returned when the metadata item is not found.
var ErrNotFound = errors.New("not found")

// ContentPrefix is the prefix of the content id for manifests.
const (
ContentPrefix = "m"
autoCompactionContentCountDefault = 16
)

// TypeLabelKey is the label key for manifest type.
const TypeLabelKey = "type"

Expand Down
4 changes: 2 additions & 2 deletions repo/manifest/manifest_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestManifest(t *testing.T) {

if err := mgr.b.IterateContents(
ctx,
content.IterateOptions{Range: index.PrefixRange(ContentPrefix)},
content.IterateOptions{Range: index.PrefixRange(content.ManifestContentPrefix)},
func(ci content.Info) error {
foundContents++
return nil
Expand Down Expand Up @@ -447,7 +447,7 @@ func getManifestContentCount(ctx context.Context, t *testing.T, mgr *Manager) in

if err := mgr.b.IterateContents(
ctx,
content.IterateOptions{Range: index.PrefixRange(ContentPrefix)},
content.IterateOptions{Range: index.PrefixRange(content.ManifestContentPrefix)},
func(ci content.Info) error {
foundContents++
return nil
Expand Down
9 changes: 6 additions & 3 deletions repo/object/object_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (om *Manager) NewWriter(ctx context.Context, opt WriterOptions) Writer {
w.description = opt.Description
w.prefix = opt.Prefix
w.compressor = compression.ByName[opt.Compressor]
w.metadataCompressor = compression.ByName[opt.MetadataCompressor]
w.totalLength = 0
w.currentPosition = 0

Expand Down Expand Up @@ -106,7 +107,7 @@ func (om *Manager) closedWriter(ow *objectWriter) {
// in parallel utilizing more CPU cores. Because some split points now start at fixed boundaries and not content-specific,
// this causes some slight loss of deduplication at concatenation points (typically 1-2 contents, usually <10MB),
// so this method should only be used for very large files where this overhead is relatively small.
func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error) {
func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID, metadataComp compression.Name) (ID, error) {
if len(objectIDs) == 0 {
return EmptyID, errors.Errorf("empty list of objects")
}
Expand All @@ -131,8 +132,10 @@ func (om *Manager) Concatenate(ctx context.Context, objectIDs []ID) (ID, error)
log(ctx).Debugf("concatenated: %v total: %v", concatenatedEntries, totalLength)

w := om.NewWriter(ctx, WriterOptions{
Prefix: indirectContentPrefix,
Description: "CONCATENATED INDEX",
Prefix: indirectContentPrefix,
Description: "CONCATENATED INDEX",
Compressor: metadataComp,
MetadataCompressor: metadataComp,
})
defer w.Close() //nolint:errcheck

Expand Down
Loading

0 comments on commit dff0752

Please sign in to comment.