Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/ydbcp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func main() {
xlog.Info(ctx, "Created TtlWatcher")
}

backupScheduleHandler := handlers.NewBackupScheduleHandler(queries.NewWriteTableQuery, clockwork.NewRealClock())
backupScheduleHandler := handlers.NewBackupScheduleHandler(queries.NewWriteTableQuery, clockwork.NewRealClock(), configInstance.FeatureFlags)

schedule_watcher.NewScheduleWatcher(
ctx, &wg, configInstance.GetProcessorIntervalSeconds(), dbConnector,
Expand Down
57 changes: 51 additions & 6 deletions internal/backup_operations/make_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backup_operations

import (
"context"
"crypto/rand"
"errors"
"fmt"
"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -35,6 +36,7 @@ type MakeBackupInternalRequest struct {
ScheduleID *string
Ttl *time.Duration
ParentOperationID *string
EncryptionSettings *pb.EncryptionSettings
}

func FromBackupSchedule(schedule *types.BackupSchedule) MakeBackupInternalRequest {
Expand Down Expand Up @@ -62,6 +64,7 @@ func FromTBWROperation(tbwr *types.TakeBackupWithRetryOperation) MakeBackupInter
ScheduleID: tbwr.ScheduleID,
Ttl: tbwr.Ttl,
ParentOperationID: &tbwr.ID,
EncryptionSettings: tbwr.EncryptionSettings,
}
}

Expand Down Expand Up @@ -274,6 +277,34 @@ func IsEmptyBackup(backup *types.Backup) bool {
return backup.Size == 0 && backup.S3Endpoint == ""
}

func GetEncryptionParams(settings *pb.EncryptionSettings) ([]byte, string, error) {
var algorithm string
var length int

switch settings.Algorithm {
case pb.EncryptionSettings_UNSPECIFIED:
case pb.EncryptionSettings_AES_128_GCM:
algorithm = "AES-128-GCM"
length = 16
break
case pb.EncryptionSettings_AES_256_GCM:
algorithm = "AES-256-GCM"
length = 32
break
case pb.EncryptionSettings_CHACHA20_POLY1305:
algorithm = "ChaCha20-Poly1305"
length = 32
break
}

dek := make([]byte, length)
_, err := rand.Read(dek)
if err != nil {
return nil, "", err
}
return dek, algorithm, nil
}

func MakeBackup(
ctx context.Context,
clientConn client.ClientConnector,
Expand Down Expand Up @@ -350,6 +381,18 @@ func MakeBackup(
S3ForcePathStyle: s3.S3ForcePathStyle,
}

if req.EncryptionSettings != nil && featureFlags.EnableBackupEncryption {
dek, algorithm, err := GetEncryptionParams(req.EncryptionSettings)
if err != nil {
return nil, nil, err
}

s3Settings.EncryptionKey = dek
s3Settings.EncryptionAlgorithm = algorithm
// TODO: encrypt the DEK using the specified KEK
// TODO: stores the encrypted DEK in S3
}

clientOperationID, err := clientConn.ExportToS3(ctx, client, s3Settings, featureFlags)
if err != nil {
xlog.Error(ctx, "can't start export operation", zap.Error(err))
Expand Down Expand Up @@ -379,9 +422,10 @@ func MakeBackup(
CreatedAt: now,
Creator: subject,
},
ScheduleID: req.ScheduleID,
ExpireAt: expireAt,
SourcePaths: pathsForExport,
ScheduleID: req.ScheduleID,
ExpireAt: expireAt,
SourcePaths: pathsForExport,
EncryptionSettings: req.EncryptionSettings,
}

op := &types.TakeBackupOperation{
Expand All @@ -399,9 +443,10 @@ func MakeBackup(
CreatedAt: now,
Creator: subject,
},
YdbOperationId: clientOperationID,
UpdatedAt: now,
ParentOperationID: req.ParentOperationID,
YdbOperationId: clientOperationID,
UpdatedAt: now,
ParentOperationID: req.ParentOperationID,
EncryptionSettings: req.EncryptionSettings,
}

return backup, op, nil
Expand Down
5 changes: 3 additions & 2 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ type MetricsServerConfig struct {
}

type FeatureFlagsConfig struct {
DisableTTLDeletion bool `yaml:"disable_ttl_deletion" default:"false"`
EnableNewPathsFormat bool `yaml:"enable_new_paths_format" default:"false"`
DisableTTLDeletion bool `yaml:"disable_ttl_deletion" default:"false"`
EnableNewPathsFormat bool `yaml:"enable_new_paths_format" default:"false"`
EnableBackupEncryption bool `yaml:"enable_backup_encryption" default:"false"`
}

type LogConfig struct {
Expand Down
22 changes: 22 additions & 0 deletions internal/connectors/client/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,17 @@ func (d *ClientYdbConnector) ExportToS3(
exportRequest.Settings.DestinationPrefix = s3Settings.DestinationPrefix
}

if featureFlags.EnableBackupEncryption && len(s3Settings.EncryptionKey) > 0 {
exportRequest.Settings.EncryptionSettings = &Ydb_Export.EncryptionSettings{
EncryptionAlgorithm: s3Settings.EncryptionAlgorithm,
Key: &Ydb_Export.EncryptionSettings_SymmetricKey_{
SymmetricKey: &Ydb_Export.EncryptionSettings_SymmetricKey{
Key: s3Settings.EncryptionKey,
},
},
}
}

response, err := exportClient.ExportToS3(ctx, exportRequest)

if err != nil {
Expand Down Expand Up @@ -425,6 +436,17 @@ func (d *ClientYdbConnector) ImportFromS3(
importRequest.Settings.DestinationPath = path.Join(clientDb.Name(), s3Settings.DestinationPrefix)
}

if len(s3Settings.EncryptionKey) > 0 {
importRequest.Settings.EncryptionSettings = &Ydb_Export.EncryptionSettings{
EncryptionAlgorithm: s3Settings.EncryptionAlgorithm,
Key: &Ydb_Export.EncryptionSettings_SymmetricKey_{
SymmetricKey: &Ydb_Export.EncryptionSettings_SymmetricKey{
Key: s3Settings.EncryptionKey,
},
},
}
}

response, err := importClient.ImportFromS3(ctx, importRequest)

if err != nil {
Expand Down
42 changes: 42 additions & 0 deletions internal/connectors/db/yql/queries/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,16 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
if tb.ParentOperationID != nil {
d.AddValueParam("$parent_operation_id", table_types.StringValueFromString(*tb.ParentOperationID))
}
if tb.EncryptionSettings != nil {
d.AddValueParam(
"$encryption_algorithm",
table_types.StringValueFromString(tb.EncryptionSettings.GetAlgorithm().String()),
)
d.AddValueParam(
"$kms_key_id",
table_types.StringValueFromString(tb.EncryptionSettings.GetKmsKey().GetKeyId()),
)
}
} else if operation.GetType() == types.OperationTypeTBWR {
tbwr, ok := operation.(*types.TakeBackupWithRetryOperation)
if !ok {
Expand Down Expand Up @@ -178,6 +188,16 @@ func BuildCreateOperationQuery(operation types.Operation, index int) WriteSingle
if tbwr.Ttl != nil {
d.AddValueParam("$ttl", table_types.IntervalValueFromDuration(*tbwr.Ttl))
}
if tbwr.EncryptionSettings != nil {
d.AddValueParam(
"$encryption_algorithm",
table_types.StringValueFromString(tbwr.EncryptionSettings.GetAlgorithm().String()),
)
d.AddValueParam(
"$kms_key_id",
table_types.StringValueFromString(tbwr.EncryptionSettings.GetKmsKey().GetKeyId()),
)
}
} else if operation.GetType() == types.OperationTypeRB {
rb, ok := operation.(*types.RestoreBackupOperation)
if !ok {
Expand Down Expand Up @@ -330,6 +350,17 @@ func BuildCreateBackupQuery(b types.Backup, index int) WriteSingleTableQueryImpl
if b.ExpireAt != nil {
d.AddValueParam("$expire_at", table_types.TimestampValueFromTime(*b.ExpireAt))
}

if b.EncryptionSettings != nil {
d.AddValueParam(
"$encryption_algorithm",
table_types.StringValueFromString(b.EncryptionSettings.GetAlgorithm().String()),
)
d.AddValueParam(
"$kms_key_id",
table_types.StringValueFromString(b.EncryptionSettings.GetKmsKey().GetKeyId()),
)
}
return d
}

Expand Down Expand Up @@ -380,6 +411,17 @@ func BuildCreateBackupScheduleQuery(schedule types.BackupSchedule, index int) Wr
"$recovery_point_objective",
table_types.IntervalValueFromDuration(schedule.ScheduleSettings.RecoveryPointObjective.AsDuration()),
)

if schedule.ScheduleSettings.EncryptionSettings != nil {
d.AddValueParam(
"$encryption_algorithm",
table_types.StringValueFromString(schedule.ScheduleSettings.EncryptionSettings.GetAlgorithm().String()),
)
d.AddValueParam(
"$kms_key_id",
table_types.StringValueFromString(schedule.ScheduleSettings.EncryptionSettings.GetKmsKey().GetKeyId()),
)
}
}
if schedule.NextLaunch != nil {
d.AddValueParam("$next_launch", table_types.TimestampValueFromTime(*schedule.NextLaunch))
Expand Down
8 changes: 8 additions & 0 deletions internal/connectors/db/yql/schema/create_tables.yql
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ CREATE TABLE Backups (

schedule_id String,

encryption_algorithm String,
kms_key_id String,

INDEX idx_container_id GLOBAL ON (container_id),
INDEX idx_created_at GLOBAL ON (created_at),
INDEX idx_expire_at GLOBAL ON (status, expire_at),
Expand Down Expand Up @@ -55,6 +58,8 @@ CREATE TABLE Operations (
paths_to_exclude String,
operation_id String,
parent_operation_id String,
encryption_algorithm String,
kms_key_id String,
--used only in TBWR
schedule_id String,
ttl Interval,
Expand Down Expand Up @@ -85,6 +90,9 @@ CREATE TABLE BackupSchedules (
initiated String,
created_at Timestamp,

encryption_algorithm String,
kms_key_id String,

recovery_point_objective Interval,

next_launch Timestamp,
Expand Down
8 changes: 8 additions & 0 deletions internal/handlers/schedule_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
"ydbcp/internal/audit"
"ydbcp/internal/config"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/types"
Expand All @@ -20,11 +21,13 @@ type BackupScheduleHandlerType func(context.Context, db.DBConnector, *types.Back
func NewBackupScheduleHandler(
queryBuilderFactory queries.WriteQueryBuilderFactory,
clock clockwork.Clock,
featureFlags config.FeatureFlagsConfig,
) BackupScheduleHandlerType {
return func(ctx context.Context, driver db.DBConnector, schedule *types.BackupSchedule) error {
return BackupScheduleHandler(
ctx, driver, schedule,
queryBuilderFactory, clock,
featureFlags,
)
}
}
Expand All @@ -46,6 +49,7 @@ func BackupScheduleHandler(
schedule *types.BackupSchedule,
queryBuilderFactory queries.WriteQueryBuilderFactory,
clock clockwork.Clock,
featureFlags config.FeatureFlagsConfig,
) error {
if schedule.Status != types.BackupScheduleStateActive {
xlog.Error(ctx, "backup schedule is not active", zap.String("ScheduleID", schedule.ID))
Expand Down Expand Up @@ -85,6 +89,10 @@ func BackupScheduleHandler(
d := schedule.ScheduleSettings.Ttl.AsDuration()
tbwr.Ttl = &d
}

if schedule.ScheduleSettings.EncryptionSettings != nil && featureFlags.EnableBackupEncryption {
tbwr.EncryptionSettings = schedule.ScheduleSettings.EncryptionSettings
}
}

xlog.Info(
Expand Down
3 changes: 2 additions & 1 deletion internal/handlers/schedule_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/assert"
"testing"
"time"
"ydbcp/internal/config"
"ydbcp/internal/connectors/db"
"ydbcp/internal/connectors/db/yql/queries"
"ydbcp/internal/types"
Expand Down Expand Up @@ -40,7 +41,7 @@ func TestBackupScheduleHandler(t *testing.T) {
)

handler := NewBackupScheduleHandler(
queries.NewWriteTableQueryMock, clock,
queries.NewWriteTableQueryMock, clock, config.FeatureFlagsConfig{},
)
err := handler(ctx, dbConnector, &schedule)
assert.Empty(t, err)
Expand Down
20 changes: 17 additions & 3 deletions internal/server/services/backup/backup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package backup
import (
"context"
"github.com/jonboulle/clockwork"
"google.golang.org/protobuf/proto"
"strconv"
"time"
"ydbcp/internal/audit"
Expand Down Expand Up @@ -110,9 +111,13 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques
ctx = xlog.With(ctx, zap.String("SubjectID", subject))
now := timestamppb.Now()

var encryptionSettings *pb.EncryptionSettings
if req.EncryptionSettings != nil {
s.IncApiCallsCounter(methodName, codes.Unimplemented)
return nil, status.Error(codes.Unimplemented, "backup encryption is not supported yet")
if !s.featureFlags.EnableBackupEncryption {
s.IncApiCallsCounter(methodName, codes.Unimplemented)
return nil, status.Error(codes.Unimplemented, "backup encryption is not supported yet")
}
encryptionSettings = proto.Clone(req.EncryptionSettings).(*pb.EncryptionSettings)
}

tbwr := &types.TakeBackupWithRetryOperation{
Expand All @@ -130,7 +135,8 @@ func (s *BackupService) MakeBackup(ctx context.Context, req *pb.MakeBackupReques
Creator: subject,
CreatedAt: now,
},
UpdatedAt: now,
UpdatedAt: now,
EncryptionSettings: encryptionSettings,
},
Retries: 0,
RetryConfig: &pb.RetryConfig{
Expand Down Expand Up @@ -383,6 +389,14 @@ func (s *BackupService) MakeRestore(ctx context.Context, req *pb.MakeRestoreRequ
DestinationPrefix: req.GetDestinationPrefix(),
}

if backup.EncryptionSettings != nil {
// TODO: read encrypted DEK from s3
// TODO: decrypt DEC via KMS
// TODO: set EncryptionKey
_, algorithm, _ := backup_operations.GetEncryptionParams(backup.EncryptionSettings)
s3Settings.EncryptionAlgorithm = algorithm
}

clientOperationID, err := s.clientConn.ImportFromS3(ctx, clientDriver, s3Settings, s.featureFlags)
if err != nil {
xlog.Error(ctx, "can't start import operation", zap.Error(err))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *BackupScheduleService) CreateBackupSchedule(
return nil, status.Error(codes.FailedPrecondition, "no backup schedule settings for CreateBackupSchedule")
}

if request.ScheduleSettings.EncryptionSettings != nil {
if !s.config.FeatureFlags.EnableBackupEncryption && request.ScheduleSettings.EncryptionSettings != nil {
s.IncApiCallsCounter(methodName, codes.Unimplemented)
return nil, status.Error(codes.Unimplemented, "backup encryption is not supported yet")
}
Expand Down
Loading
Loading