From 86e54801c5f5df81c56ceffeb4d0d362eb9a7f55 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Tue, 30 Jul 2024 17:14:11 +0800 Subject: [PATCH] data mover micro service restore Signed-off-by: Lyndon-Li --- changelogs/unreleased/8061-Lyndon-Li | 1 + pkg/cmd/cli/datamover/restore.go | 193 +++++++++- pkg/cmd/cli/datamover/restore_test.go | 166 +++++++++ pkg/datamover/backup_micro_service.go | 6 +- pkg/datamover/backup_micro_service_test.go | 2 +- pkg/datamover/restore_micro_service.go | 295 +++++++++++++++ pkg/datamover/restore_micro_service_test.go | 394 ++++++++++++++++++++ 7 files changed, 1046 insertions(+), 11 deletions(-) create mode 100644 changelogs/unreleased/8061-Lyndon-Li create mode 100644 pkg/cmd/cli/datamover/restore_test.go create mode 100644 pkg/datamover/restore_micro_service.go create mode 100644 pkg/datamover/restore_micro_service_test.go diff --git a/changelogs/unreleased/8061-Lyndon-Li b/changelogs/unreleased/8061-Lyndon-Li new file mode 100644 index 0000000000..64236059a6 --- /dev/null +++ b/changelogs/unreleased/8061-Lyndon-Li @@ -0,0 +1 @@ +Data mover micro service restore according to design #7576 \ No newline at end of file diff --git a/pkg/cmd/cli/datamover/restore.go b/pkg/cmd/cli/datamover/restore.go index ddd44729f5..fbd92fa189 100644 --- a/pkg/cmd/cli/datamover/restore.go +++ b/pkg/cmd/cli/datamover/restore.go @@ -14,16 +14,37 @@ limitations under the License. package datamover import ( + "context" "fmt" + "os" "strings" "time" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/spf13/cobra" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "github.com/vmware-tanzu/velero/internal/credentials" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" "github.com/vmware-tanzu/velero/pkg/buildinfo" "github.com/vmware-tanzu/velero/pkg/client" + "github.com/vmware-tanzu/velero/pkg/cmd/util/signals" + "github.com/vmware-tanzu/velero/pkg/datamover" + "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/repository" + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/logging" + + ctlcache "sigs.k8s.io/controller-runtime/pkg/cache" + ctlclient "sigs.k8s.io/controller-runtime/pkg/client" ) type dataMoverRestoreConfig struct { @@ -52,7 +73,10 @@ func NewRestoreCommand(f client.Factory) *cobra.Command { logger.Infof("Starting Velero data-mover restore %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA()) f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name())) - s := newdataMoverRestore(logger, config) + s, err := newdataMoverRestore(logger, f, config) + if err != nil { + exitWithMessage(logger, false, "Failed to create data mover restore, %v", err) + } s.run() }, @@ -74,19 +98,174 @@ func NewRestoreCommand(f client.Factory) *cobra.Command { } type dataMoverRestore struct { - logger logrus.FieldLogger - config dataMoverRestoreConfig + logger logrus.FieldLogger + ctx context.Context + cancelFunc context.CancelFunc + client ctlclient.Client + cache ctlcache.Cache + namespace string + nodeName string + config dataMoverRestoreConfig + kubeClient kubernetes.Interface + dataPathMgr *datapath.Manager } -func newdataMoverRestore(logger logrus.FieldLogger, config dataMoverRestoreConfig) *dataMoverRestore { +func newdataMoverRestore(logger logrus.FieldLogger, factory client.Factory, config dataMoverRestoreConfig) (*dataMoverRestore, error) { + ctx, cancelFunc := context.WithCancel(context.Background()) + + clientConfig, err := factory.ClientConfig() + if err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to create client config") + } + + ctrl.SetLogger(zap.New(zap.UseDevMode(true))) + + scheme := runtime.NewScheme() + if err := velerov1api.AddToScheme(scheme); err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to add velero v1 scheme") + } + + if err := velerov2alpha1api.AddToScheme(scheme); err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to add velero v2alpha1 scheme") + } + + if err := v1.AddToScheme(scheme); err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to add core v1 scheme") + } + + nodeName := os.Getenv("NODE_NAME") + + // use a field selector to filter to only pods scheduled on this node. + cacheOption := ctlcache.Options{ + Scheme: scheme, + ByObject: map[ctlclient.Object]ctlcache.ByObject{ + &v1.Pod{}: { + Field: fields.Set{"spec.nodeName": nodeName}.AsSelector(), + }, + &velerov2alpha1api.DataDownload{}: { + Field: fields.Set{"metadata.namespace": factory.Namespace()}.AsSelector(), + }, + }, + } + + cli, err := ctlclient.New(clientConfig, ctlclient.Options{ + Scheme: scheme, + }) + if err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to create client") + } + + cache, err := ctlcache.New(clientConfig, cacheOption) + if err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to create client cache") + } + s := &dataMoverRestore{ - logger: logger, - config: config, + logger: logger, + ctx: ctx, + cancelFunc: cancelFunc, + client: cli, + cache: cache, + config: config, + namespace: factory.Namespace(), + nodeName: nodeName, + } + + s.kubeClient, err = factory.KubeClient() + if err != nil { + cancelFunc() + return nil, errors.Wrap(err, "error to create kube client") } - return s + s.dataPathMgr = datapath.NewManager(1) + + return s, nil } +var funcCreateDataPathRestore = (*dataMoverRestore).createDataPathService + func (s *dataMoverRestore) run() { + signals.CancelOnShutdown(s.cancelFunc, s.logger) + go func() { + if err := s.cache.Start(s.ctx); err != nil { + s.logger.WithError(err).Warn("error starting cache") + } + }() + + // TODOOO: call s.runDataPath() time.Sleep(time.Duration(1<<63 - 1)) } + +func (s *dataMoverRestore) runDataPath() { + s.logger.Infof("Starting micro service in node %s for dd %s", s.nodeName, s.config.ddName) + + dpService, err := funcCreateDataPathRestore(s) + if err != nil { + s.cancelFunc() + funcExitWithMessage(s.logger, false, "Failed to create data path service for DataDownload %s: %v", s.config.ddName, err) + return + } + + s.logger.Infof("Starting data path service %s", s.config.ddName) + + err = dpService.Init() + if err != nil { + s.cancelFunc() + funcExitWithMessage(s.logger, false, "Failed to init data path service for DataDownload %s: %v", s.config.ddName, err) + return + } + + result, err := dpService.RunCancelableDataPath(s.ctx) + if err != nil { + s.cancelFunc() + funcExitWithMessage(s.logger, false, "Failed to run data path service for DataDownload %s: %v", s.config.ddName, err) + return + } + + s.logger.WithField("dd", s.config.ddName).Info("Data path service completed") + + dpService.Shutdown() + + s.logger.WithField("dd", s.config.ddName).Info("Data path service is shut down") + + s.cancelFunc() + + funcExitWithMessage(s.logger, true, result) +} + +func (s *dataMoverRestore) createDataPathService() (dataPathService, error) { + credentialFileStore, err := funcNewCredentialFileStore( + s.client, + s.namespace, + defaultCredentialsDirectory, + filesystem.NewFileSystem(), + ) + if err != nil { + return nil, errors.Wrapf(err, "error to create credential file store") + } + + credSecretStore, err := funcNewCredentialSecretStore(s.client, s.namespace) + if err != nil { + return nil, errors.Wrapf(err, "error to create credential secret store") + } + + credGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore} + + duInformer, err := s.cache.GetInformer(s.ctx, &velerov2alpha1api.DataDownload{}) + if err != nil { + return nil, errors.Wrap(err, "error to get controller-runtime informer from manager") + } + + repoEnsurer := repository.NewEnsurer(s.client, s.logger, s.config.resourceTimeout) + + return datamover.NewRestoreMicroService(s.ctx, s.client, s.kubeClient, s.config.ddName, s.namespace, s.nodeName, datapath.AccessPoint{ + ByPath: s.config.volumePath, + VolMode: uploader.PersistentVolumeMode(s.config.volumeMode), + }, s.dataPathMgr, repoEnsurer, credGetter, duInformer, s.logger), nil +} diff --git a/pkg/cmd/cli/datamover/restore_test.go b/pkg/cmd/cli/datamover/restore_test.go new file mode 100644 index 0000000000..664c2bdbc9 --- /dev/null +++ b/pkg/cmd/cli/datamover/restore_test.go @@ -0,0 +1,166 @@ +/* +Copyright The Velero Contributors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datamover + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + cacheMock "github.com/vmware-tanzu/velero/pkg/cmd/cli/datamover/mocks" + velerotest "github.com/vmware-tanzu/velero/pkg/test" +) + +func fakeCreateDataPathRestoreWithErr(_ *dataMoverRestore) (dataPathService, error) { + return nil, errors.New("fake-create-data-path-error") +} + +func fakeCreateDataPathRestore(_ *dataMoverRestore) (dataPathService, error) { + return frHelper, nil +} + +func TestRunDataPathRestore(t *testing.T) { + tests := []struct { + name string + ddName string + createDataPathFail bool + initDataPathErr error + runCancelableDataPathErr error + runCancelableDataPathResult string + expectedMessage string + expectedSucceed bool + }{ + { + name: "create data path failed", + ddName: "fake-name", + createDataPathFail: true, + expectedMessage: "Failed to create data path service for DataDownload fake-name: fake-create-data-path-error", + }, + { + name: "init data path failed", + ddName: "fake-name", + initDataPathErr: errors.New("fake-init-data-path-error"), + expectedMessage: "Failed to init data path service for DataDownload fake-name: fake-init-data-path-error", + }, + { + name: "run data path failed", + ddName: "fake-name", + runCancelableDataPathErr: errors.New("fake-run-data-path-error"), + expectedMessage: "Failed to run data path service for DataDownload fake-name: fake-run-data-path-error", + }, + { + name: "succeed", + ddName: "fake-name", + runCancelableDataPathResult: "fake-run-data-path-result", + expectedMessage: "fake-run-data-path-result", + expectedSucceed: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + frHelper = &fakeRunHelper{ + initErr: test.initDataPathErr, + runCancelableDataPathErr: test.runCancelableDataPathErr, + runCancelableDataPathResult: test.runCancelableDataPathResult, + } + + if test.createDataPathFail { + funcCreateDataPathRestore = fakeCreateDataPathRestoreWithErr + } else { + funcCreateDataPathRestore = fakeCreateDataPathRestore + } + + funcExitWithMessage = frHelper.ExitWithMessage + + s := &dataMoverRestore{ + logger: velerotest.NewLogger(), + cancelFunc: func() {}, + config: dataMoverRestoreConfig{ + ddName: test.ddName, + }, + } + + s.runDataPath() + + assert.Equal(t, test.expectedMessage, frHelper.exitMessage) + assert.Equal(t, test.expectedSucceed, frHelper.succeed) + }) + } +} + +func TestCreateDataPathRestore(t *testing.T) { + tests := []struct { + name string + fileStoreErr error + secretStoreErr error + mockGetInformer bool + getInformerErr error + expectedError string + }{ + { + name: "create credential file store error", + fileStoreErr: errors.New("fake-file-store-error"), + expectedError: "error to create credential file store: fake-file-store-error", + }, + { + name: "create credential secret store", + secretStoreErr: errors.New("fake-secret-store-error"), + expectedError: "error to create credential secret store: fake-secret-store-error", + }, + { + name: "get informer error", + mockGetInformer: true, + getInformerErr: errors.New("fake-get-informer-error"), + expectedError: "error to get controller-runtime informer from manager: fake-get-informer-error", + }, + { + name: "succeed", + mockGetInformer: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fcHelper := &fakeCreateDataPathServiceHelper{ + fileStoreErr: test.fileStoreErr, + secretStoreErr: test.secretStoreErr, + } + + funcNewCredentialFileStore = fcHelper.NewNamespacedFileStore + funcNewCredentialSecretStore = fcHelper.NewNamespacedSecretStore + + cache := cacheMock.NewCache(t) + if test.mockGetInformer { + cache.On("GetInformer", mock.Anything, mock.Anything).Return(nil, test.getInformerErr) + } + + funcExitWithMessage = frHelper.ExitWithMessage + + s := &dataMoverRestore{ + cache: cache, + } + + _, err := s.createDataPathService() + + if test.expectedError != "" { + assert.EqualError(t, err, test.expectedError) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/pkg/datamover/backup_micro_service.go b/pkg/datamover/backup_micro_service.go index 2f4fa6e56d..ee594cac98 100644 --- a/pkg/datamover/backup_micro_service.go +++ b/pkg/datamover/backup_micro_service.go @@ -127,7 +127,7 @@ func (r *BackupMicroService) Init() error { return err } -var waitDuTimeout time.Duration = time.Minute * 2 +var waitControllerTimeout time.Duration = time.Minute * 2 func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string, error) { log := r.logger.WithFields(logrus.Fields{ @@ -135,7 +135,7 @@ func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string, }) du := &velerov2alpha1api.DataUpload{} - err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitDuTimeout, true, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitControllerTimeout, true, func(ctx context.Context) (bool, error) { err := r.client.Get(ctx, types.NamespacedName{ Namespace: r.namespace, Name: r.dataUploadName, @@ -313,7 +313,7 @@ func (r *BackupMicroService) closeDataPath(ctx context.Context, duName string) { func (r *BackupMicroService) cancelDataUpload(du *velerov2alpha1api.DataUpload) { r.logger.WithField("DataUpload", du.Name).Info("Data upload is being canceled") - r.eventRecorder.Event(du, false, "Canceling", "Canceing for data upload %s", du.Name) + r.eventRecorder.Event(du, false, "Canceling", "Canceling for data upload %s", du.Name) fsBackup := r.dataPathMgr.GetAsyncBR(du.Name) if fsBackup == nil { diff --git a/pkg/datamover/backup_micro_service_test.go b/pkg/datamover/backup_micro_service_test.go index 8fd595e93e..9db38d4c0d 100644 --- a/pkg/datamover/backup_micro_service_test.go +++ b/pkg/datamover/backup_micro_service_test.go @@ -412,7 +412,7 @@ func TestRunCancelableDataPath(t *testing.T) { return fsBR } - waitDuTimeout = time.Second + waitControllerTimeout = time.Second if test.result != nil { go func() { diff --git a/pkg/datamover/restore_micro_service.go b/pkg/datamover/restore_micro_service.go new file mode 100644 index 0000000000..508469701a --- /dev/null +++ b/pkg/datamover/restore_micro_service.go @@ -0,0 +1,295 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datamover + +import ( + "context" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/vmware-tanzu/velero/internal/credentials" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" + "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/repository" + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/util/kube" + + cachetool "k8s.io/client-go/tools/cache" +) + +// RestoreMicroService process data mover restores inside the restore pod +type RestoreMicroService struct { + ctx context.Context + client client.Client + kubeClient kubernetes.Interface + repoEnsurer *repository.Ensurer + credentialGetter *credentials.CredentialGetter + logger logrus.FieldLogger + dataPathMgr *datapath.Manager + eventRecorder kube.EventRecorder + + namespace string + dataDownloadName string + dataDownload *velerov2alpha1api.DataDownload + sourceTargetPath datapath.AccessPoint + + resultSignal chan dataPathResult + + ddInformer cache.Informer + ddHandler cachetool.ResourceEventHandlerRegistration + nodeName string +} + +func NewRestoreMicroService(ctx context.Context, client client.Client, kubeClient kubernetes.Interface, dataDownloadName string, namespace string, nodeName string, + sourceTargetPath datapath.AccessPoint, dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, cred *credentials.CredentialGetter, + ddInformer cache.Informer, log logrus.FieldLogger) *RestoreMicroService { + return &RestoreMicroService{ + ctx: ctx, + client: client, + kubeClient: kubeClient, + credentialGetter: cred, + logger: log, + repoEnsurer: repoEnsurer, + dataPathMgr: dataPathMgr, + namespace: namespace, + dataDownloadName: dataDownloadName, + sourceTargetPath: sourceTargetPath, + nodeName: nodeName, + resultSignal: make(chan dataPathResult), + ddInformer: ddInformer, + } +} + +func (r *RestoreMicroService) Init() error { + r.eventRecorder = kube.NewEventRecorder(r.kubeClient, r.client.Scheme(), r.dataDownloadName, r.nodeName) + + handler, err := r.ddInformer.AddEventHandler( + cachetool.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj interface{}, newObj interface{}) { + oldDd := oldObj.(*velerov2alpha1api.DataDownload) + newDd := newObj.(*velerov2alpha1api.DataDownload) + + if newDd.Name != r.dataDownloadName { + return + } + + if newDd.Status.Phase != velerov2alpha1api.DataDownloadPhaseInProgress { + return + } + + if newDd.Spec.Cancel && !oldDd.Spec.Cancel { + r.cancelDataDownload(newDd) + } + }, + }, + ) + + if err != nil { + return errors.Wrap(err, "error adding dd handler") + } + + r.ddHandler = handler + + return err +} + +func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string, error) { + log := r.logger.WithFields(logrus.Fields{ + "datadownload": r.dataDownloadName, + }) + + dd := &velerov2alpha1api.DataDownload{} + err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitControllerTimeout, true, func(ctx context.Context) (bool, error) { + err := r.client.Get(ctx, types.NamespacedName{ + Namespace: r.namespace, + Name: r.dataDownloadName, + }, dd) + if apierrors.IsNotFound(err) { + return false, nil + } + + if err != nil { + return true, errors.Wrapf(err, "error to get dd %s", r.dataDownloadName) + } + + if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { + return true, nil + } else { + return false, nil + } + }) + if err != nil { + log.WithError(err).Error("Failed to wait dd") + return "", errors.Wrap(err, "error waiting for dd") + } + + r.dataDownload = dd + + log.Info("Run cancelable dataDownload") + + callbacks := datapath.Callbacks{ + OnCompleted: r.OnDataDownloadCompleted, + OnFailed: r.OnDataDownloadFailed, + OnCancelled: r.OnDataDownloadCancelled, + OnProgress: r.OnDataDownloadProgress, + } + + fsRestore, err := r.dataPathMgr.CreateFileSystemBR(dd.Name, dataUploadDownloadRequestor, ctx, r.client, dd.Namespace, callbacks, log) + if err != nil { + return "", errors.Wrap(err, "error to create data path") + } + + log.Debug("Found volume path") + if err := fsRestore.Init(ctx, + &datapath.FSBRInitParam{ + BSLName: dd.Spec.BackupStorageLocation, + SourceNamespace: dd.Spec.SourceNamespace, + UploaderType: GetUploaderType(dd.Spec.DataMover), + RepositoryType: velerov1api.BackupRepositoryTypeKopia, + RepoIdentifier: "", + RepositoryEnsurer: r.repoEnsurer, + CredentialGetter: r.credentialGetter, + }); err != nil { + return "", errors.Wrap(err, "error to initialize data path") + } + log.Info("fs init") + + if err := fsRestore.StartRestore(dd.Spec.SnapshotID, r.sourceTargetPath, dd.Spec.DataMoverConfig); err != nil { + return "", errors.Wrap(err, "error starting data path restore") + } + + log.Info("Async fs restore data path started") + r.eventRecorder.Event(dd, false, datapath.EventReasonStarted, "Data path for %s started", dd.Name) + + result := "" + select { + case <-ctx.Done(): + err = errors.New("timed out waiting for fs restore to complete") + break + case res := <-r.resultSignal: + err = res.err + result = res.result + break + } + + if err != nil { + log.WithError(err).Error("Async fs restore was not completed") + } + + return result, err +} + +func (r *RestoreMicroService) Shutdown() { + r.eventRecorder.Shutdown() + r.closeDataPath(r.ctx, r.dataDownloadName) + + if r.ddHandler != nil { + if err := r.ddInformer.RemoveEventHandler(r.ddHandler); err != nil { + r.logger.WithError(err).Warn("Failed to remove pod handler") + } + } +} + +func (r *RestoreMicroService) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) { + defer r.closeDataPath(ctx, ddName) + + log := r.logger.WithField("datadownload", ddName) + + restoreBytes, err := funcMarshal(result.Restore) + if err != nil { + log.WithError(err).Errorf("Failed to marshal restore result %v", result.Restore) + r.resultSignal <- dataPathResult{ + err: errors.Wrapf(err, "Failed to marshal restore result %v", result.Restore), + } + } else { + r.eventRecorder.Event(r.dataDownload, false, datapath.EventReasonCompleted, string(restoreBytes)) + r.resultSignal <- dataPathResult{ + result: string(restoreBytes), + } + } + + log.Info("Async fs restore data path completed") +} + +func (r *RestoreMicroService) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) { + defer r.closeDataPath(ctx, ddName) + + log := r.logger.WithField("datadownload", ddName) + log.WithError(err).Error("Async fs restore data path failed") + + r.eventRecorder.Event(r.dataDownload, false, datapath.EventReasonFailed, "Data path for data download %s failed, error %v", r.dataDownloadName, err) + r.resultSignal <- dataPathResult{ + err: errors.Wrapf(err, "Data path for data download %s failed", r.dataDownloadName), + } +} + +func (r *RestoreMicroService) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) { + defer r.closeDataPath(ctx, ddName) + + log := r.logger.WithField("datadownload", ddName) + log.Warn("Async fs restore data path canceled") + + r.eventRecorder.Event(r.dataDownload, false, datapath.EventReasonCancelled, "Data path for data download %s canceled", ddName) + r.resultSignal <- dataPathResult{ + err: errors.New(datapath.ErrCancelled), + } +} + +func (r *RestoreMicroService) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) { + log := r.logger.WithFields(logrus.Fields{ + "datadownload": ddName, + }) + + progressBytes, err := funcMarshal(progress) + if err != nil { + log.WithError(err).Errorf("Failed to marshal progress %v", progress) + return + } + + r.eventRecorder.Event(r.dataDownload, false, datapath.EventReasonProgress, string(progressBytes)) +} + +func (r *RestoreMicroService) closeDataPath(ctx context.Context, ddName string) { + fsRestore := r.dataPathMgr.GetAsyncBR(ddName) + if fsRestore != nil { + fsRestore.Close(ctx) + } + + r.dataPathMgr.RemoveAsyncBR(ddName) +} + +func (r *RestoreMicroService) cancelDataDownload(dd *velerov2alpha1api.DataDownload) { + r.logger.WithField("DataDownload", dd.Name).Info("Data download is being canceled") + + r.eventRecorder.Event(dd, false, "Canceling", "Canceling for data download %s", dd.Name) + + fsBackup := r.dataPathMgr.GetAsyncBR(dd.Name) + if fsBackup == nil { + r.OnDataDownloadCancelled(r.ctx, dd.GetNamespace(), dd.GetName()) + } else { + fsBackup.Cancel() + } +} diff --git a/pkg/datamover/restore_micro_service_test.go b/pkg/datamover/restore_micro_service_test.go new file mode 100644 index 0000000000..e3ef8701da --- /dev/null +++ b/pkg/datamover/restore_micro_service_test.go @@ -0,0 +1,394 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datamover + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "k8s.io/apimachinery/pkg/runtime" + kbclient "sigs.k8s.io/controller-runtime/pkg/client" + clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/datapath" + datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks" + "github.com/vmware-tanzu/velero/pkg/uploader" + + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" + + velerotest "github.com/vmware-tanzu/velero/pkg/test" +) + +func TestOnDataDownloadFailed(t *testing.T) { + dataDownloadName := "fake-data-download" + bt := &backupMsTestHelper{} + + bs := &RestoreMicroService{ + dataDownloadName: dataDownloadName, + dataPathMgr: datapath.NewManager(1), + eventRecorder: bt, + resultSignal: make(chan dataPathResult), + logger: velerotest.NewLogger(), + } + + expectedErr := "Data path for data download fake-data-download failed: fake-error" + expectedEventReason := datapath.EventReasonFailed + expectedEventMsg := "Data path for data download fake-data-download failed, error fake-error" + + go bs.OnDataDownloadFailed(context.TODO(), velerov1api.DefaultNamespace, dataDownloadName, errors.New("fake-error")) + + result := <-bs.resultSignal + assert.EqualError(t, result.err, expectedErr) + assert.Equal(t, expectedEventReason, bt.EventReason()) + assert.Equal(t, expectedEventMsg, bt.EventMessage()) +} + +func TestOnDataDownloadCancelled(t *testing.T) { + dataDownloadName := "fake-data-download" + bt := &backupMsTestHelper{} + + bs := &RestoreMicroService{ + dataDownloadName: dataDownloadName, + dataPathMgr: datapath.NewManager(1), + eventRecorder: bt, + resultSignal: make(chan dataPathResult), + logger: velerotest.NewLogger(), + } + + expectedErr := datapath.ErrCancelled + expectedEventReason := datapath.EventReasonCancelled + expectedEventMsg := "Data path for data download fake-data-download canceled" + + go bs.OnDataDownloadCancelled(context.TODO(), velerov1api.DefaultNamespace, dataDownloadName) + + result := <-bs.resultSignal + assert.EqualError(t, result.err, expectedErr) + assert.Equal(t, expectedEventReason, bt.EventReason()) + assert.Equal(t, expectedEventMsg, bt.EventMessage()) +} + +func TestOnDataDownloadCompleted(t *testing.T) { + tests := []struct { + name string + expectedErr string + expectedEventReason string + expectedEventMsg string + marshalErr error + marshallStr string + }{ + { + name: "marshal fail", + marshalErr: errors.New("fake-marshal-error"), + expectedErr: "Failed to marshal restore result {{ }}: fake-marshal-error", + }, + { + name: "succeed", + marshallStr: "fake-complete-string", + expectedEventReason: datapath.EventReasonCompleted, + expectedEventMsg: "fake-complete-string", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + dataDownloadName := "fake-data-download" + + bt := &backupMsTestHelper{ + marshalErr: test.marshalErr, + marshalBytes: []byte(test.marshallStr), + } + + bs := &RestoreMicroService{ + dataPathMgr: datapath.NewManager(1), + eventRecorder: bt, + resultSignal: make(chan dataPathResult), + logger: velerotest.NewLogger(), + } + + funcMarshal = bt.Marshal + + go bs.OnDataDownloadCompleted(context.TODO(), velerov1api.DefaultNamespace, dataDownloadName, datapath.Result{}) + + result := <-bs.resultSignal + if test.marshalErr != nil { + assert.EqualError(t, result.err, test.expectedErr) + } else { + assert.NoError(t, result.err) + assert.Equal(t, test.expectedEventReason, bt.EventReason()) + assert.Equal(t, test.expectedEventMsg, bt.EventMessage()) + } + }) + } +} + +func TestOnDataDownloadProgress(t *testing.T) { + tests := []struct { + name string + expectedEventReason string + expectedEventMsg string + marshalErr error + marshallStr string + }{ + { + name: "marshal fail", + marshalErr: errors.New("fake-marshal-error"), + }, + { + name: "succeed", + marshallStr: "fake-progress-string", + expectedEventReason: datapath.EventReasonProgress, + expectedEventMsg: "fake-progress-string", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + dataDownloadName := "fake-data-download" + + bt := &backupMsTestHelper{ + marshalErr: test.marshalErr, + marshalBytes: []byte(test.marshallStr), + } + + bs := &RestoreMicroService{ + dataPathMgr: datapath.NewManager(1), + eventRecorder: bt, + logger: velerotest.NewLogger(), + } + + funcMarshal = bt.Marshal + + bs.OnDataDownloadProgress(context.TODO(), velerov1api.DefaultNamespace, dataDownloadName, &uploader.Progress{}) + + if test.marshalErr != nil { + assert.False(t, bt.withEvent) + } else { + assert.True(t, bt.withEvent) + assert.Equal(t, test.expectedEventReason, bt.EventReason()) + assert.Equal(t, test.expectedEventMsg, bt.EventMessage()) + } + }) + } +} + +func TestCancelDataDownload(t *testing.T) { + tests := []struct { + name string + expectedEventReason string + expectedEventMsg string + expectedErr string + }{ + { + name: "no fs restore", + expectedEventReason: datapath.EventReasonCancelled, + expectedEventMsg: "Data path for data download fake-data-download canceled", + expectedErr: datapath.ErrCancelled, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + dataDownloadName := "fake-data-download" + dd := builder.ForDataDownload(velerov1api.DefaultNamespace, dataDownloadName).Result() + + bt := &backupMsTestHelper{} + + bs := &RestoreMicroService{ + dataPathMgr: datapath.NewManager(1), + eventRecorder: bt, + resultSignal: make(chan dataPathResult), + logger: velerotest.NewLogger(), + } + + go bs.cancelDataDownload(dd) + + result := <-bs.resultSignal + + assert.EqualError(t, result.err, test.expectedErr) + assert.True(t, bt.withEvent) + assert.Equal(t, test.expectedEventReason, bt.EventReason()) + assert.Equal(t, test.expectedEventMsg, bt.EventMessage()) + }) + } +} + +func TestRunCancelableRestore(t *testing.T) { + dataDownloadName := "fake-data-download" + dd := builder.ForDataDownload(velerov1api.DefaultNamespace, dataDownloadName).Phase(velerov2alpha1api.DataDownloadPhaseNew).Result() + ddInProgress := builder.ForDataDownload(velerov1api.DefaultNamespace, dataDownloadName).Phase(velerov2alpha1api.DataDownloadPhaseInProgress).Result() + ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second) + + tests := []struct { + name string + ctx context.Context + result *dataPathResult + dataPathMgr *datapath.Manager + kubeClientObj []runtime.Object + initErr error + startErr error + dataPathStarted bool + expectedEventMsg string + expectedErr string + }{ + { + name: "no dd", + ctx: context.Background(), + expectedErr: "error waiting for dd: context deadline exceeded", + }, + { + name: "dd not in in-progress", + ctx: context.Background(), + kubeClientObj: []runtime.Object{dd}, + expectedErr: "error waiting for dd: context deadline exceeded", + }, + { + name: "create data path fail", + ctx: context.Background(), + kubeClientObj: []runtime.Object{ddInProgress}, + dataPathMgr: datapath.NewManager(0), + expectedErr: "error to create data path: Concurrent number exceeds", + }, + { + name: "init data path fail", + ctx: context.Background(), + kubeClientObj: []runtime.Object{ddInProgress}, + initErr: errors.New("fake-init-error"), + expectedErr: "error to initialize data path: fake-init-error", + }, + { + name: "start data path fail", + ctx: context.Background(), + kubeClientObj: []runtime.Object{ddInProgress}, + startErr: errors.New("fake-start-error"), + expectedErr: "error starting data path restore: fake-start-error", + }, + { + name: "data path timeout", + ctx: ctxTimeout, + kubeClientObj: []runtime.Object{ddInProgress}, + dataPathStarted: true, + expectedEventMsg: fmt.Sprintf("Data path for %s started", dataDownloadName), + expectedErr: "timed out waiting for fs restore to complete", + }, + { + name: "data path returns error", + ctx: context.Background(), + kubeClientObj: []runtime.Object{ddInProgress}, + dataPathStarted: true, + result: &dataPathResult{ + err: errors.New("fake-data-path-error"), + }, + expectedEventMsg: fmt.Sprintf("Data path for %s started", dataDownloadName), + expectedErr: "fake-data-path-error", + }, + { + name: "succeed", + ctx: context.Background(), + kubeClientObj: []runtime.Object{ddInProgress}, + dataPathStarted: true, + result: &dataPathResult{ + result: "fake-succeed-result", + }, + expectedEventMsg: fmt.Sprintf("Data path for %s started", dataDownloadName), + }, + } + + scheme := runtime.NewScheme() + velerov2alpha1api.AddToScheme(scheme) + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeClientBuilder := clientFake.NewClientBuilder() + fakeClientBuilder = fakeClientBuilder.WithScheme(scheme) + + fakeClient := fakeClientBuilder.WithRuntimeObjects(test.kubeClientObj...).Build() + + bt := &backupMsTestHelper{} + + rs := &RestoreMicroService{ + namespace: velerov1api.DefaultNamespace, + dataDownloadName: dataDownloadName, + ctx: context.Background(), + client: fakeClient, + dataPathMgr: datapath.NewManager(1), + eventRecorder: bt, + resultSignal: make(chan dataPathResult), + logger: velerotest.NewLogger(), + } + + if test.ctx != nil { + rs.ctx = test.ctx + } + + if test.dataPathMgr != nil { + rs.dataPathMgr = test.dataPathMgr + } + + datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { + fsBR := datapathmockes.NewAsyncBR(t) + if test.initErr != nil { + fsBR.On("Init", mock.Anything, mock.Anything).Return(test.initErr) + } + + if test.startErr != nil { + fsBR.On("Init", mock.Anything, mock.Anything).Return(nil) + fsBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.startErr) + } + + if test.dataPathStarted { + fsBR.On("Init", mock.Anything, mock.Anything).Return(nil) + fsBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(nil) + } + + return fsBR + } + + waitControllerTimeout = time.Second + + if test.result != nil { + go func() { + time.Sleep(time.Millisecond * 500) + rs.resultSignal <- *test.result + }() + } + + result, err := rs.RunCancelableDataPath(test.ctx) + + if test.expectedErr != "" { + assert.EqualError(t, err, test.expectedErr) + } else { + assert.NoError(t, err) + assert.Equal(t, test.result.result, result) + } + + if test.expectedEventMsg != "" { + assert.True(t, bt.withEvent) + assert.Equal(t, test.expectedEventMsg, bt.EventMessage()) + } + }) + } + + cancel() +}