Skip to content

Commit

Permalink
new data path for data mover ms
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <[email protected]>
  • Loading branch information
Lyndon-Li committed Jun 6, 2024
1 parent a8d77ea commit a38cbbd
Show file tree
Hide file tree
Showing 15 changed files with 204 additions and 139 deletions.
2 changes: 1 addition & 1 deletion pkg/cmd/cli/repomantenance/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (o *Options) runRepoPrune(f velerocli.Factory, namespace string, logger log
credentials.CredentialGetter{
FromFile: credentialFileStore,
FromSecret: credentialSecretStore,
}, o.RepoType, cli, logger)
}, o.RepoType, logger)

Check warning on line 142 in pkg/cmd/cli/repomantenance/maintenance.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/repomantenance/maintenance.go#L142

Added line #L142 was not covered by tests
}

// backupRepository
Expand Down
13 changes: 11 additions & 2 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,19 @@ func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, fsRe
}

log.WithField("path", path.ByPath).Debug("Found host path")
if err := fsRestore.Init(ctx, dd.Spec.BackupStorageLocation, dd.Spec.SourceNamespace, datamover.GetUploaderType(dd.Spec.DataMover),
velerov1api.BackupRepositoryTypeKopia, "", r.repositoryEnsurer, r.credentialGetter); err != nil {

if err := fsRestore.Init(ctx, &datapath.FSBRInitParam{
BSLName: dd.Spec.BackupStorageLocation,
SourceNamespace: dd.Spec.SourceNamespace,
UploaderType: datamover.GetUploaderType(dd.Spec.DataMover),
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
RepoIdentifier: "",
RepositoryEnsurer: r.repositoryEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {

Check warning on line 346 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L338-L346

Added lines #L338 - L346 were not covered by tests
return r.errorOut(ctx, dd, err, "error to initialize data path", log)
}

log.WithField("path", path.ByPath).Info("fs init")

if err := fsRestore.StartRestore(dd.Spec.SnapshotID, path, dd.Spec.DataMoverConfig); err != nil {
Expand Down
21 changes: 18 additions & 3 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,23 +338,38 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)

func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, fsBackup datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
log.Info("Run cancelable dataUpload")

path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.VolumeName, r.client, r.fileSystem, log)
if err != nil {
return r.errorOut(ctx, du, err, "error exposing host path for pod volume", log)
}

log.WithField("path", path.ByPath).Debug("Found host path")
if err := fsBackup.Init(ctx, du.Spec.BackupStorageLocation, du.Spec.SourceNamespace, datamover.GetUploaderType(du.Spec.DataMover),
velerov1api.BackupRepositoryTypeKopia, "", r.repoEnsurer, r.credentialGetter); err != nil {

if err := fsBackup.Init(ctx, &datapath.FSBRInitParam{
BSLName: du.Spec.BackupStorageLocation,
SourceNamespace: du.Spec.SourceNamespace,
UploaderType: datamover.GetUploaderType(du.Spec.DataMover),
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
RepoIdentifier: "",
RepositoryEnsurer: r.repoEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {
return r.errorOut(ctx, du, err, "error to initialize data path", log)
}

log.WithField("path", path.ByPath).Info("fs init")

tags := map[string]string{
velerov1api.AsyncOperationIDLabel: du.Labels[velerov1api.AsyncOperationIDLabel],
}

if err := fsBackup.StartBackup(path, datamover.GetRealSource(du.Spec.SourceNamespace, du.Spec.SourcePVC), "", false, tags, du.Spec.DataMoverConfig); err != nil {
if err := fsBackup.StartBackup(path, du.Spec.DataMoverConfig, &datapath.FSBRStartParam{
RealSource: datamover.GetRealSource(du.Spec.SourceNamespace, du.Spec.SourcePVC),
ParentSnapshot: "",
ForceFull: false,
Tags: tags,
}); err != nil {
return r.errorOut(ctx, du, err, "error starting data path backup", log)
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/controller/data_upload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/repository"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
Expand Down Expand Up @@ -297,11 +296,11 @@ type fakeDataUploadFSBR struct {
clock clock.WithTickerAndDelayedExecution
}

func (f *fakeDataUploadFSBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error {
func (f *fakeDataUploadFSBR) Init(ctx context.Context, param interface{}) error {
return nil
}

func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string, uploaderConfigs map[string]string) error {
func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, uploaderConfigs map[string]string, param interface{}) error {
du := f.du
original := f.du.DeepCopy()
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted
Expand Down
18 changes: 15 additions & 3 deletions pkg/controller/pod_volume_backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,15 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ

log.WithField("path", path.ByPath).Debugf("Found host path")

if err := fsBackup.Init(ctx, pvb.Spec.BackupStorageLocation, pvb.Spec.Pod.Namespace, pvb.Spec.UploaderType,
podvolume.GetPvbRepositoryType(&pvb), pvb.Spec.RepoIdentifier, r.repositoryEnsurer, r.credentialGetter); err != nil {
if err := fsBackup.Init(ctx, &datapath.FSBRInitParam{
BSLName: pvb.Spec.BackupStorageLocation,
SourceNamespace: pvb.Spec.Pod.Namespace,
UploaderType: pvb.Spec.UploaderType,
RepositoryType: podvolume.GetPvbRepositoryType(&pvb),
RepoIdentifier: pvb.Spec.RepoIdentifier,
RepositoryEnsurer: r.repositoryEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {
return r.errorOut(ctx, &pvb, err, "error to initialize data path", log)
}

Expand All @@ -179,7 +186,12 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
}

if err := fsBackup.StartBackup(path, "", parentSnapshotID, false, pvb.Spec.Tags, pvb.Spec.UploaderSettings); err != nil {
if err := fsBackup.StartBackup(path, pvb.Spec.UploaderSettings, &datapath.FSBRStartParam{
RealSource: "",
ParentSnapshot: parentSnapshotID,
ForceFull: false,
Tags: pvb.Spec.Tags,
}); err != nil {
return r.errorOut(ctx, &pvb, err, "error starting data path backup", log)
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/controller/pod_volume_backup_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/repository"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)

Expand Down Expand Up @@ -99,11 +98,11 @@ type fakeFSBR struct {
clock clock.WithTickerAndDelayedExecution
}

func (b *fakeFSBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error {
func (b *fakeFSBR) Init(ctx context.Context, param interface{}) error {
return nil
}

func (b *fakeFSBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string, uploaderConfigs map[string]string) error {
func (b *fakeFSBR) StartBackup(source datapath.AccessPoint, uploaderConfigs map[string]string, param interface{}) error {
pvb := b.pvb

original := b.pvb.DeepCopy()
Expand Down
11 changes: 9 additions & 2 deletions pkg/controller/pod_volume_restore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,15 @@ func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req

log.WithField("path", volumePath.ByPath).Debugf("Found host path")

if err := fsRestore.Init(ctx, pvr.Spec.BackupStorageLocation, pvr.Spec.SourceNamespace, pvr.Spec.UploaderType,
podvolume.GetPvrRepositoryType(pvr), pvr.Spec.RepoIdentifier, c.repositoryEnsurer, c.credentialGetter); err != nil {
if err := fsRestore.Init(ctx, &datapath.FSBRInitParam{
BSLName: pvr.Spec.BackupStorageLocation,
SourceNamespace: pvr.Spec.SourceNamespace,
UploaderType: pvr.Spec.UploaderType,
RepositoryType: podvolume.GetPvrRepositoryType(pvr),
RepoIdentifier: pvr.Spec.RepoIdentifier,
RepositoryEnsurer: c.repositoryEnsurer,
CredentialGetter: c.credentialGetter,
}); err != nil {

Check warning on line 152 in pkg/controller/pod_volume_restore_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/pod_volume_restore_controller.go#L144-L152

Added lines #L144 - L152 were not covered by tests
return c.errorOut(ctx, pvr, err, "error to initialize data path", log)
}

Expand Down
61 changes: 42 additions & 19 deletions pkg/datapath/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,26 @@ import (
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)

// FSBRInitParam define the input param for FSBR init
type FSBRInitParam struct {
BSLName string
SourceNamespace string
UploaderType string
RepositoryType string
RepoIdentifier string
RepositoryEnsurer *repository.Ensurer
CredentialGetter *credentials.CredentialGetter
Filesystem filesystem.Interface
}

// FSBRStartParam define the input param for FSBR start
type FSBRStartParam struct {
RealSource string
ParentSnapshot string
ForceFull bool
Tags map[string]string
}

type fileSystemBR struct {
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -61,8 +81,9 @@ func newFileSystemBR(jobName string, requestorType string, client client.Client,
return fs
}

func (fs *fileSystemBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string,
repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error {
func (fs *fileSystemBR) Init(ctx context.Context, param interface{}) error {
initParam := param.(*FSBRInitParam)

Check warning on line 85 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L84-L85

Added lines #L84 - L85 were not covered by tests

var err error
defer func() {
if err != nil {
Expand All @@ -75,38 +96,38 @@ func (fs *fileSystemBR) Init(ctx context.Context, bslName string, sourceNamespac
backupLocation := &velerov1api.BackupStorageLocation{}
if err = fs.client.Get(ctx, client.ObjectKey{
Namespace: fs.namespace,
Name: bslName,
Name: initParam.BSLName,

Check warning on line 99 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L99

Added line #L99 was not covered by tests
}, backupLocation); err != nil {
return errors.Wrapf(err, "error getting backup storage location %s", bslName)
return errors.Wrapf(err, "error getting backup storage location %s", initParam.BSLName)

Check warning on line 101 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L101

Added line #L101 was not covered by tests
}

fs.backupLocation = backupLocation

fs.backupRepo, err = repositoryEnsurer.EnsureRepo(ctx, fs.namespace, sourceNamespace, bslName, repositoryType)
fs.backupRepo, err = initParam.RepositoryEnsurer.EnsureRepo(ctx, fs.namespace, initParam.SourceNamespace, initParam.BSLName, initParam.RepositoryType)

Check warning on line 106 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L106

Added line #L106 was not covered by tests
if err != nil {
return errors.Wrapf(err, "error to ensure backup repository %s-%s-%s", bslName, sourceNamespace, repositoryType)
return errors.Wrapf(err, "error to ensure backup repository %s-%s-%s", initParam.BSLName, initParam.SourceNamespace, initParam.RepositoryType)

Check warning on line 108 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L108

Added line #L108 was not covered by tests
}

err = fs.boostRepoConnect(ctx, repositoryType, credentialGetter)
err = fs.boostRepoConnect(ctx, initParam.RepositoryType, initParam.CredentialGetter)

Check warning on line 111 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L111

Added line #L111 was not covered by tests
if err != nil {
return errors.Wrapf(err, "error to boost backup repository connection %s-%s-%s", bslName, sourceNamespace, repositoryType)
return errors.Wrapf(err, "error to boost backup repository connection %s-%s-%s", initParam.BSLName, initParam.SourceNamespace, initParam.RepositoryType)

Check warning on line 113 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L113

Added line #L113 was not covered by tests
}

fs.uploaderProv, err = provider.NewUploaderProvider(ctx, fs.client, uploaderType, fs.requestorType, repoIdentifier,
fs.backupLocation, fs.backupRepo, credentialGetter, repokey.RepoKeySelector(), fs.log)
fs.uploaderProv, err = provider.NewUploaderProvider(ctx, fs.client, initParam.UploaderType, fs.requestorType, initParam.RepoIdentifier,
fs.backupLocation, fs.backupRepo, initParam.CredentialGetter, repokey.RepoKeySelector(), fs.log)

Check warning on line 117 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L116-L117

Added lines #L116 - L117 were not covered by tests
if err != nil {
return errors.Wrapf(err, "error creating uploader %s", uploaderType)
return errors.Wrapf(err, "error creating uploader %s", initParam.UploaderType)

Check warning on line 119 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L119

Added line #L119 was not covered by tests
}

fs.initialized = true

fs.log.WithFields(
logrus.Fields{
"jobName": fs.jobName,
"bsl": bslName,
"source namespace": sourceNamespace,
"uploader": uploaderType,
"repository": repositoryType,
"bsl": initParam.BSLName,
"source namespace": initParam.SourceNamespace,
"uploader": initParam.UploaderType,
"repository": initParam.RepositoryType,

Check warning on line 130 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L127-L130

Added lines #L127 - L130 were not covered by tests
}).Info("FileSystemBR is initialized")

return nil
Expand All @@ -129,14 +150,16 @@ func (fs *fileSystemBR) Close(ctx context.Context) {
fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed")
}

func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string, uploaderConfig map[string]string) error {
func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[string]string, param interface{}) error {
if !fs.initialized {
return errors.New("file system data path is not initialized")
}

backupParam := param.(*FSBRStartParam)

go func() {
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull,
parentSnapshot, source.VolMode, uploaderConfig, fs)
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull,
backupParam.ParentSnapshot, source.VolMode, uploaderConfig, fs)

if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
Expand Down Expand Up @@ -192,7 +215,7 @@ func (fs *fileSystemBR) Cancel() {

func (fs *fileSystemBR) boostRepoConnect(ctx context.Context, repositoryType string, credentialGetter *credentials.CredentialGetter) error {
if repositoryType == velerov1api.BackupRepositoryTypeKopia {
if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.client, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil {
if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil {

Check warning on line 218 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L218

Added line #L218 was not covered by tests
return err
}
} else {
Expand Down
6 changes: 3 additions & 3 deletions pkg/datapath/file_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ func TestAsyncBackup(t *testing.T) {
fs.initialized = true
fs.callbacks = test.callbacks

err := fs.StartBackup(AccessPoint{ByPath: test.path}, "", "", false, nil, map[string]string{})
err := fs.StartBackup(AccessPoint{ByPath: test.path}, map[string]string{}, &FSBRStartParam{})
require.NoError(t, err)

<-finish

assert.Equal(t, asyncErr, test.err)
assert.Equal(t, asyncResult, test.result)
assert.Equal(t, test.err, asyncErr)
assert.Equal(t, test.result, asyncResult)
})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/datapath/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (m *Manager) CreateFileSystemBR(jobName string, requestorType string, ctx c
m.trackerLock.Lock()
defer m.trackerLock.Unlock()

if len(m.tracker) == m.cocurrentNum {
if len(m.tracker) >= m.cocurrentNum {
return nil, ConcurrentLimitExceed
}

Expand Down
Loading

0 comments on commit a38cbbd

Please sign in to comment.