Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New data path for data mover ms #7955

Merged
merged 1 commit into from
Jul 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/7955-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
New data path for data mover ms according to design #7574
2 changes: 1 addition & 1 deletion pkg/cmd/cli/repomantenance/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@
credentials.CredentialGetter{
FromFile: credentialFileStore,
FromSecret: credentialSecretStore,
}, o.RepoType, cli, logger)
}, o.RepoType, logger)

Check warning on line 142 in pkg/cmd/cli/repomantenance/maintenance.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/repomantenance/maintenance.go#L142

Added line #L142 was not covered by tests
}

// backupRepository
Expand Down
13 changes: 11 additions & 2 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,19 @@
}

log.WithField("path", path.ByPath).Debug("Found host path")
if err := fsRestore.Init(ctx, dd.Spec.BackupStorageLocation, dd.Spec.SourceNamespace, datamover.GetUploaderType(dd.Spec.DataMover),
velerov1api.BackupRepositoryTypeKopia, "", r.repositoryEnsurer, r.credentialGetter); err != nil {

if err := fsRestore.Init(ctx, &datapath.FSBRInitParam{
BSLName: dd.Spec.BackupStorageLocation,
SourceNamespace: dd.Spec.SourceNamespace,
UploaderType: datamover.GetUploaderType(dd.Spec.DataMover),
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
RepoIdentifier: "",
RepositoryEnsurer: r.repositoryEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {

Check warning on line 346 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L338-L346

Added lines #L338 - L346 were not covered by tests
return r.errorOut(ctx, dd, err, "error to initialize data path", log)
}

log.WithField("path", path.ByPath).Info("fs init")

if err := fsRestore.StartRestore(dd.Spec.SnapshotID, path, dd.Spec.DataMoverConfig); err != nil {
Expand Down
21 changes: 18 additions & 3 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,23 +338,38 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request)

func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, fsBackup datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
log.Info("Run cancelable dataUpload")

path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.VolumeName, r.client, r.fileSystem, log)
if err != nil {
return r.errorOut(ctx, du, err, "error exposing host path for pod volume", log)
}

log.WithField("path", path.ByPath).Debug("Found host path")
if err := fsBackup.Init(ctx, du.Spec.BackupStorageLocation, du.Spec.SourceNamespace, datamover.GetUploaderType(du.Spec.DataMover),
velerov1api.BackupRepositoryTypeKopia, "", r.repoEnsurer, r.credentialGetter); err != nil {

if err := fsBackup.Init(ctx, &datapath.FSBRInitParam{
BSLName: du.Spec.BackupStorageLocation,
SourceNamespace: du.Spec.SourceNamespace,
UploaderType: datamover.GetUploaderType(du.Spec.DataMover),
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
RepoIdentifier: "",
RepositoryEnsurer: r.repoEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {
return r.errorOut(ctx, du, err, "error to initialize data path", log)
}

log.WithField("path", path.ByPath).Info("fs init")

tags := map[string]string{
velerov1api.AsyncOperationIDLabel: du.Labels[velerov1api.AsyncOperationIDLabel],
}

if err := fsBackup.StartBackup(path, datamover.GetRealSource(du.Spec.SourceNamespace, du.Spec.SourcePVC), "", false, tags, du.Spec.DataMoverConfig); err != nil {
if err := fsBackup.StartBackup(path, du.Spec.DataMoverConfig, &datapath.FSBRStartParam{
RealSource: datamover.GetRealSource(du.Spec.SourceNamespace, du.Spec.SourcePVC),
ParentSnapshot: "",
ForceFull: false,
Tags: tags,
}); err != nil {
return r.errorOut(ctx, du, err, "error starting data path backup", log)
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/controller/data_upload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/exposer"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/repository"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
"github.com/vmware-tanzu/velero/pkg/uploader"
"github.com/vmware-tanzu/velero/pkg/util/boolptr"
Expand Down Expand Up @@ -297,11 +296,11 @@ type fakeDataUploadFSBR struct {
clock clock.WithTickerAndDelayedExecution
}

func (f *fakeDataUploadFSBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error {
func (f *fakeDataUploadFSBR) Init(ctx context.Context, param interface{}) error {
return nil
}

func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string, uploaderConfigs map[string]string) error {
func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, uploaderConfigs map[string]string, param interface{}) error {
du := f.du
original := f.du.DeepCopy()
du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted
Expand Down
18 changes: 15 additions & 3 deletions pkg/controller/pod_volume_backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,15 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ

log.WithField("path", path.ByPath).Debugf("Found host path")

if err := fsBackup.Init(ctx, pvb.Spec.BackupStorageLocation, pvb.Spec.Pod.Namespace, pvb.Spec.UploaderType,
podvolume.GetPvbRepositoryType(&pvb), pvb.Spec.RepoIdentifier, r.repositoryEnsurer, r.credentialGetter); err != nil {
if err := fsBackup.Init(ctx, &datapath.FSBRInitParam{
BSLName: pvb.Spec.BackupStorageLocation,
SourceNamespace: pvb.Spec.Pod.Namespace,
UploaderType: pvb.Spec.UploaderType,
RepositoryType: podvolume.GetPvbRepositoryType(&pvb),
RepoIdentifier: pvb.Spec.RepoIdentifier,
RepositoryEnsurer: r.repositoryEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {
return r.errorOut(ctx, &pvb, err, "error to initialize data path", log)
}

Expand All @@ -179,7 +186,12 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}
}

if err := fsBackup.StartBackup(path, "", parentSnapshotID, false, pvb.Spec.Tags, pvb.Spec.UploaderSettings); err != nil {
if err := fsBackup.StartBackup(path, pvb.Spec.UploaderSettings, &datapath.FSBRStartParam{
RealSource: "",
ParentSnapshot: parentSnapshotID,
ForceFull: false,
Tags: pvb.Spec.Tags,
}); err != nil {
return r.errorOut(ctx, &pvb, err, "error starting data path backup", log)
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/controller/pod_volume_backup_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/datapath"
"github.com/vmware-tanzu/velero/pkg/metrics"
"github.com/vmware-tanzu/velero/pkg/repository"
velerotest "github.com/vmware-tanzu/velero/pkg/test"
)

Expand Down Expand Up @@ -99,11 +98,11 @@ type fakeFSBR struct {
clock clock.WithTickerAndDelayedExecution
}

func (b *fakeFSBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error {
func (b *fakeFSBR) Init(ctx context.Context, param interface{}) error {
return nil
}

func (b *fakeFSBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string, uploaderConfigs map[string]string) error {
func (b *fakeFSBR) StartBackup(source datapath.AccessPoint, uploaderConfigs map[string]string, param interface{}) error {
pvb := b.pvb

original := b.pvb.DeepCopy()
Expand Down
11 changes: 9 additions & 2 deletions pkg/controller/pod_volume_restore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,15 @@

log.WithField("path", volumePath.ByPath).Debugf("Found host path")

if err := fsRestore.Init(ctx, pvr.Spec.BackupStorageLocation, pvr.Spec.SourceNamespace, pvr.Spec.UploaderType,
podvolume.GetPvrRepositoryType(pvr), pvr.Spec.RepoIdentifier, c.repositoryEnsurer, c.credentialGetter); err != nil {
if err := fsRestore.Init(ctx, &datapath.FSBRInitParam{
BSLName: pvr.Spec.BackupStorageLocation,
SourceNamespace: pvr.Spec.SourceNamespace,
UploaderType: pvr.Spec.UploaderType,
RepositoryType: podvolume.GetPvrRepositoryType(pvr),
RepoIdentifier: pvr.Spec.RepoIdentifier,
RepositoryEnsurer: c.repositoryEnsurer,
CredentialGetter: c.credentialGetter,
}); err != nil {

Check warning on line 152 in pkg/controller/pod_volume_restore_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/pod_volume_restore_controller.go#L144-L152

Added lines #L144 - L152 were not covered by tests
return c.errorOut(ctx, pvr, err, "error to initialize data path", log)
}

Expand Down
61 changes: 42 additions & 19 deletions pkg/datapath/file_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,26 @@
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
)

// FSBRInitParam define the input param for FSBR init
type FSBRInitParam struct {
BSLName string
SourceNamespace string
UploaderType string
RepositoryType string
RepoIdentifier string
RepositoryEnsurer *repository.Ensurer
CredentialGetter *credentials.CredentialGetter
Filesystem filesystem.Interface
}

// FSBRStartParam define the input param for FSBR start
type FSBRStartParam struct {
RealSource string
ParentSnapshot string
ForceFull bool
Tags map[string]string
}

type fileSystemBR struct {
ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -61,8 +81,9 @@
return fs
}

func (fs *fileSystemBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string,
repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error {
func (fs *fileSystemBR) Init(ctx context.Context, param interface{}) error {
initParam := param.(*FSBRInitParam)

Check warning on line 85 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L84-L85

Added lines #L84 - L85 were not covered by tests

var err error
defer func() {
if err != nil {
Expand All @@ -75,38 +96,38 @@
backupLocation := &velerov1api.BackupStorageLocation{}
if err = fs.client.Get(ctx, client.ObjectKey{
Namespace: fs.namespace,
Name: bslName,
Name: initParam.BSLName,

Check warning on line 99 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L99

Added line #L99 was not covered by tests
}, backupLocation); err != nil {
return errors.Wrapf(err, "error getting backup storage location %s", bslName)
return errors.Wrapf(err, "error getting backup storage location %s", initParam.BSLName)

Check warning on line 101 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L101

Added line #L101 was not covered by tests
}

fs.backupLocation = backupLocation

fs.backupRepo, err = repositoryEnsurer.EnsureRepo(ctx, fs.namespace, sourceNamespace, bslName, repositoryType)
fs.backupRepo, err = initParam.RepositoryEnsurer.EnsureRepo(ctx, fs.namespace, initParam.SourceNamespace, initParam.BSLName, initParam.RepositoryType)

Check warning on line 106 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L106

Added line #L106 was not covered by tests
if err != nil {
return errors.Wrapf(err, "error to ensure backup repository %s-%s-%s", bslName, sourceNamespace, repositoryType)
return errors.Wrapf(err, "error to ensure backup repository %s-%s-%s", initParam.BSLName, initParam.SourceNamespace, initParam.RepositoryType)

Check warning on line 108 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L108

Added line #L108 was not covered by tests
}

err = fs.boostRepoConnect(ctx, repositoryType, credentialGetter)
err = fs.boostRepoConnect(ctx, initParam.RepositoryType, initParam.CredentialGetter)

Check warning on line 111 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L111

Added line #L111 was not covered by tests
if err != nil {
return errors.Wrapf(err, "error to boost backup repository connection %s-%s-%s", bslName, sourceNamespace, repositoryType)
return errors.Wrapf(err, "error to boost backup repository connection %s-%s-%s", initParam.BSLName, initParam.SourceNamespace, initParam.RepositoryType)

Check warning on line 113 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L113

Added line #L113 was not covered by tests
}

fs.uploaderProv, err = provider.NewUploaderProvider(ctx, fs.client, uploaderType, fs.requestorType, repoIdentifier,
fs.backupLocation, fs.backupRepo, credentialGetter, repokey.RepoKeySelector(), fs.log)
fs.uploaderProv, err = provider.NewUploaderProvider(ctx, fs.client, initParam.UploaderType, fs.requestorType, initParam.RepoIdentifier,
fs.backupLocation, fs.backupRepo, initParam.CredentialGetter, repokey.RepoKeySelector(), fs.log)

Check warning on line 117 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L116-L117

Added lines #L116 - L117 were not covered by tests
if err != nil {
return errors.Wrapf(err, "error creating uploader %s", uploaderType)
return errors.Wrapf(err, "error creating uploader %s", initParam.UploaderType)

Check warning on line 119 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L119

Added line #L119 was not covered by tests
}

fs.initialized = true

fs.log.WithFields(
logrus.Fields{
"jobName": fs.jobName,
"bsl": bslName,
"source namespace": sourceNamespace,
"uploader": uploaderType,
"repository": repositoryType,
"bsl": initParam.BSLName,
"source namespace": initParam.SourceNamespace,
"uploader": initParam.UploaderType,
"repository": initParam.RepositoryType,

Check warning on line 130 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L127-L130

Added lines #L127 - L130 were not covered by tests
}).Info("FileSystemBR is initialized")

return nil
Expand All @@ -129,14 +150,16 @@
fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed")
}

func (fs *fileSystemBR) StartBackup(source AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string, uploaderConfig map[string]string) error {
func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[string]string, param interface{}) error {
if !fs.initialized {
return errors.New("file system data path is not initialized")
}

backupParam := param.(*FSBRStartParam)

go func() {
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, realSource, tags, forceFull,
parentSnapshot, source.VolMode, uploaderConfig, fs)
snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull,
backupParam.ParentSnapshot, source.VolMode, uploaderConfig, fs)

if err == provider.ErrorCanceled {
fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName)
Expand Down Expand Up @@ -192,7 +215,7 @@

func (fs *fileSystemBR) boostRepoConnect(ctx context.Context, repositoryType string, credentialGetter *credentials.CredentialGetter) error {
if repositoryType == velerov1api.BackupRepositoryTypeKopia {
if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.client, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil {
if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil {

Check warning on line 218 in pkg/datapath/file_system.go

View check run for this annotation

Codecov / codecov/patch

pkg/datapath/file_system.go#L218

Added line #L218 was not covered by tests
return err
}
} else {
Expand Down
6 changes: 3 additions & 3 deletions pkg/datapath/file_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ func TestAsyncBackup(t *testing.T) {
fs.initialized = true
fs.callbacks = test.callbacks

err := fs.StartBackup(AccessPoint{ByPath: test.path}, "", "", false, nil, map[string]string{})
err := fs.StartBackup(AccessPoint{ByPath: test.path}, map[string]string{}, &FSBRStartParam{})
require.NoError(t, err)

<-finish

assert.Equal(t, asyncErr, test.err)
assert.Equal(t, asyncResult, test.result)
assert.Equal(t, test.err, asyncErr)
assert.Equal(t, test.result, asyncResult)
})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/datapath/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (m *Manager) CreateFileSystemBR(jobName string, requestorType string, ctx c
m.trackerLock.Lock()
defer m.trackerLock.Unlock()

if len(m.tracker) == m.cocurrentNum {
if len(m.tracker) >= m.cocurrentNum {
return nil, ConcurrentLimitExceed
}

Expand Down
Loading
Loading