Skip to content

Commit

Permalink
Merge pull request #6608 from shawn-hurley/feature/add-provider-inter…
Browse files Browse the repository at this point in the history
…face-blockmod

Feature/add provider interface for block mode
  • Loading branch information
reasonerjt authored Aug 16, 2023
2 parents e3b6063 + 563a16c commit 0b30adb
Show file tree
Hide file tree
Showing 13 changed files with 132 additions and 23 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/6608-shawn-hurley
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add API support for volMode block, only error for now.
14 changes: 12 additions & 2 deletions pkg/datapath/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, paren
if !fs.initialized {
return errors.New("file system data path is not initialized")
}
volMode := getPersistentVolumeMode(source)

go func() {
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, parentSnapshot, fs)
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull, parentSnapshot, volMode, fs)

if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
Expand All @@ -154,8 +155,10 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro
return errors.New("file system data path is not initialized")
}

volMode := getPersistentVolumeMode(target)

go func() {
err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, fs)
err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, volMode, fs)

if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
Expand All @@ -169,6 +172,13 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro
return nil
}

func getPersistentVolumeMode(source AccessPoint) uploader.PersistentVolumeMode {
if source.ByBlock != "" {
return uploader.PersistentVolumeBlock
}
return uploader.PersistentVolumeFilesystem
}

// UpdateProgress which implement ProgressUpdater interface to update progress status
func (fs *fileSystemBR) UpdateProgress(p *uploader.Progress) {
if fs.callbacks.OnProgress != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/datapath/file_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestAsyncBackup(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR)
mockProvider := providerMock.NewProvider(t)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err)
mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err)
fs.uploaderProv = mockProvider
fs.initialized = true
fs.callbacks = test.callbacks
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestAsyncRestore(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR)
mockProvider := providerMock.NewProvider(t)
mockProvider.On("RunRestore", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.err)
mockProvider.On("RunRestore", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.err)
fs.uploaderProv = mockProvider
fs.initialized = true
fs.callbacks = test.callbacks
Expand Down
3 changes: 2 additions & 1 deletion pkg/datapath/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type Callbacks struct {

// AccessPoint represents an access point that has been exposed to a data path instance
type AccessPoint struct {
ByPath string
ByPath string
ByBlock string
}

// AsyncBR is the interface for asynchronous data path methods
Expand Down
7 changes: 6 additions & 1 deletion pkg/uploader/kopia/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,15 @@ func setupDefaultPolicy() *policy.Tree {

// Backup backup specific sourcePath and update progress
func Backup(ctx context.Context, fsUploader SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string,
forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
if fsUploader == nil {
return nil, false, errors.New("get empty kopia uploader")
}

if volMode == uploader.PersistentVolumeBlock {
return nil, false, errors.New("unable to handle block storage")
}

dir, err := filepath.Abs(sourcePath)
if err != nil {
return nil, false, errors.Wrapf(err, "Invalid source path '%s'", sourcePath)
Expand Down
15 changes: 13 additions & 2 deletions pkg/uploader/kopia/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,7 @@ func TestBackup(t *testing.T) {
isSnapshotSourceError bool
expectedError error
expectedEmpty bool
volMode uploader.PersistentVolumeMode
}
manifest := &snapshot.Manifest{
ID: "test",
Expand All @@ -590,10 +591,20 @@ func TestBackup(t *testing.T) {
tags: nil,
expectedError: errors.New("Unable to read dir"),
},
{
name: "Unable to handle block mode",
sourcePath: "/",
tags: nil,
volMode: uploader.PersistentVolumeBlock,
expectedError: errors.New("unable to handle block storage"),
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.volMode == "" {
tc.volMode = uploader.PersistentVolumeFilesystem
}
s := injectSnapshotFuncs()
args := []mockArgs{
{methodName: "LoadSnapshot", returns: []interface{}{manifest, nil}},
Expand All @@ -616,9 +627,9 @@ func TestBackup(t *testing.T) {
var snapshotInfo *uploader.SnapshotInfo
var err error
if tc.isEmptyUploader {
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), nil, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.tags, &logrus.Logger{})
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), nil, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.volMode, tc.tags, &logrus.Logger{})
} else {
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), s.uploderMock, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.tags, &logrus.Logger{})
snapshotInfo, isSnapshotEmpty, err = Backup(context.Background(), s.uploderMock, s.repoWriterMock, tc.sourcePath, "", tc.forceFull, tc.parentSnapshot, tc.volMode, tc.tags, &logrus.Logger{})
}
// Check if the returned error matches the expected error
if tc.expectedError != nil {
Expand Down
14 changes: 13 additions & 1 deletion pkg/uploader/provider/kopia.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func (kp *kopiaProvider) RunBackup(
tags map[string]string,
forceFull bool,
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
updater uploader.ProgressUpdater) (string, bool, error) {
if updater == nil {
return "", false, errors.New("Need to initial backup progress updater first")
Expand All @@ -127,6 +128,11 @@ func (kp *kopiaProvider) RunBackup(
return "", false, errors.New("path is empty")
}

// For now, error on block mode
if volMode == uploader.PersistentVolumeBlock {
return "", false, errors.New("unable to currently support block mode")
}

log := kp.log.WithFields(logrus.Fields{
"path": path,
"realSource": realSource,
Expand All @@ -153,7 +159,7 @@ func (kp *kopiaProvider) RunBackup(
tags[uploader.SnapshotRequesterTag] = kp.requestorType
tags[uploader.SnapshotUploaderTag] = uploader.KopiaType

snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, tags, log)
snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, realSource, forceFull, parentSnapshot, volMode, tags, log)
if err != nil {
if kpUploader.IsCanceled() {
log.Error("Kopia backup is canceled")
Expand Down Expand Up @@ -197,11 +203,17 @@ func (kp *kopiaProvider) RunRestore(
ctx context.Context,
snapshotID string,
volumePath string,
volMode uploader.PersistentVolumeMode,
updater uploader.ProgressUpdater) error {
log := kp.log.WithFields(logrus.Fields{
"snapshotID": snapshotID,
"volumePath": volumePath,
})

if volMode == uploader.PersistentVolumeBlock {
return errors.New("unable to currently support block mode")
}

repoWriter := kopia.NewShimRepo(kp.bkRepo)
progress := new(kopia.Progress)
progress.InitThrottle(restoreProgressCheckInterval)
Expand Down
36 changes: 30 additions & 6 deletions pkg/uploader/provider/kopia_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,35 +68,47 @@ func TestRunBackup(t *testing.T) {

testCases := []struct {
name string
hookBackupFunc func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error)
hookBackupFunc func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error)
volMode uploader.PersistentVolumeMode
notError bool
}{
{
name: "success to backup",
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return &uploader.SnapshotInfo{}, false, nil
},
notError: true,
},
{
name: "get error to backup",
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return &uploader.SnapshotInfo{}, false, errors.New("failed to backup")
},
notError: false,
},
{
name: "got empty snapshot",
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return nil, true, errors.New("snapshot is empty")
},
notError: false,
},
{
name: "error on vol mode",
hookBackupFunc: func(ctx context.Context, fsUploader kopia.SnapshotUploader, repoWriter repo.RepositoryWriter, sourcePath string, realSource string, forceFull bool, parentSnapshot string, volMode uploader.PersistentVolumeMode, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) {
return nil, true, nil
},
volMode: uploader.PersistentVolumeBlock,
notError: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.volMode == "" {
tc.volMode = uploader.PersistentVolumeFilesystem
}
BackupFunc = tc.hookBackupFunc
_, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", &updater)
_, _, err := kp.RunBackup(context.Background(), "var", "", nil, false, "", tc.volMode, &updater)
if tc.notError {
assert.NoError(t, err)
} else {
Expand All @@ -115,6 +127,7 @@ func TestRunRestore(t *testing.T) {
name string
hookRestoreFunc func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error)
notError bool
volMode uploader.PersistentVolumeMode
}{
{
name: "normal restore",
Expand All @@ -130,12 +143,23 @@ func TestRunRestore(t *testing.T) {
},
notError: false,
},
{
name: "failed to restore block mode",
hookRestoreFunc: func(ctx context.Context, rep repo.RepositoryWriter, progress *kopia.Progress, snapshotID, dest string, log logrus.FieldLogger, cancleCh chan struct{}) (int64, int32, error) {
return 0, 0, errors.New("failed to restore")
},
volMode: uploader.PersistentVolumeBlock,
notError: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.volMode == "" {
tc.volMode = uploader.PersistentVolumeFilesystem
}
RestoreFunc = tc.hookRestoreFunc
err := kp.RunRestore(context.Background(), "", "/var", &updater)
err := kp.RunRestore(context.Background(), "", "/var", tc.volMode, &updater)
if tc.notError {
assert.NoError(t, err)
} else {
Expand Down
8 changes: 4 additions & 4 deletions pkg/uploader/provider/mocks/Provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/uploader/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ type Provider interface {
tags map[string]string,
forceFull bool,
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
updater uploader.ProgressUpdater) (string, bool, error)
// RunRestore which will do restore for one specific volume with given snapshot id and return error
// updater is used for updating backup progress which implement by third-party
RunRestore(
ctx context.Context,
snapshotID string,
volumePath string,
volMode uploader.PersistentVolumeMode,
updater uploader.ProgressUpdater) error
// Close which will close related repository
Close(ctx context.Context) error
Expand Down
10 changes: 10 additions & 0 deletions pkg/uploader/provider/restic.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (rp *resticProvider) RunBackup(
tags map[string]string,
forceFull bool,
parentSnapshot string,
volMode uploader.PersistentVolumeMode,
updater uploader.ProgressUpdater) (string, bool, error) {
if updater == nil {
return "", false, errors.New("Need to initial backup progress updater first")
Expand All @@ -134,6 +135,10 @@ func (rp *resticProvider) RunBackup(
return "", false, errors.New("real source is not empty, this is not supported by restic uploader")
}

if volMode == uploader.PersistentVolumeBlock {
return "", false, errors.New("unable to support block mode")
}

log := rp.log.WithFields(logrus.Fields{
"path": path,
"parentSnapshot": parentSnapshot,
Expand Down Expand Up @@ -179,6 +184,7 @@ func (rp *resticProvider) RunRestore(
ctx context.Context,
snapshotID string,
volumePath string,
volMode uploader.PersistentVolumeMode,
updater uploader.ProgressUpdater) error {
if updater == nil {
return errors.New("Need to initial backup progress updater first")
Expand All @@ -188,6 +194,10 @@ func (rp *resticProvider) RunRestore(
"volumePath": volumePath,
})

if volMode == uploader.PersistentVolumeBlock {
return errors.New("unable to support block mode")
}

restoreCmd := resticRestoreCMDFunc(rp.repoIdentifier, rp.credentialsFile, snapshotID, volumePath)
restoreCmd.Env = rp.cmdEnv
restoreCmd.CACertFile = rp.caCertFile
Expand Down
Loading

0 comments on commit 0b30adb

Please sign in to comment.