diff --git a/changelogs/unreleased/6436-qiuming-best b/changelogs/unreleased/6436-qiuming-best new file mode 100644 index 0000000000..adb564bb9b --- /dev/null +++ b/changelogs/unreleased/6436-qiuming-best @@ -0,0 +1 @@ +Add data download controller for data mover diff --git a/pkg/builder/data_download_builder.go b/pkg/builder/data_download_builder.go new file mode 100644 index 0000000000..0842c0c324 --- /dev/null +++ b/pkg/builder/data_download_builder.go @@ -0,0 +1,103 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package builder + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" +) + +// DataDownloadBuilder builds DataDownload objects +type DataDownloadBuilder struct { + object *velerov2alpha1api.DataDownload +} + +// ForDataDownload is the constructor for a DataDownloadBuilder. +func ForDataDownload(ns, name string) *DataDownloadBuilder { + return &DataDownloadBuilder{ + object: &velerov2alpha1api.DataDownload{ + TypeMeta: metav1.TypeMeta{ + APIVersion: velerov2alpha1api.SchemeGroupVersion.String(), + Kind: "DataDownloadload", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + }, + } +} + +// Result returns the built DataDownload. +func (d *DataDownloadBuilder) Result() *velerov2alpha1api.DataDownload { + return d.object +} + +// BackupStorageLocation sets the DataDownload's backup storage location. +func (d *DataDownloadBuilder) BackupStorageLocation(name string) *DataDownloadBuilder { + d.object.Spec.BackupStorageLocation = name + return d +} + +// Phase sets the DataDownload's phase. +func (d *DataDownloadBuilder) Phase(phase velerov2alpha1api.DataDownloadPhase) *DataDownloadBuilder { + d.object.Status.Phase = phase + return d +} + +// SnapshotID sets the DataDownload's SnapshotID. +func (d *DataDownloadBuilder) SnapshotID(id string) *DataDownloadBuilder { + d.object.Spec.SnapshotID = id + return d +} + +// DataMover sets the DataDownload's DataMover. +func (d *DataDownloadBuilder) DataMover(dataMover string) *DataDownloadBuilder { + d.object.Spec.DataMover = dataMover + return d +} + +// SourceNamespace sets the DataDownload's SourceNamespace. +func (d *DataDownloadBuilder) SourceNamespace(sourceNamespace string) *DataDownloadBuilder { + d.object.Spec.SourceNamespace = sourceNamespace + return d +} + +// TargetVolume sets the DataDownload's TargetVolume. +func (d *DataDownloadBuilder) TargetVolume(targetVolume velerov2alpha1api.TargetVolumeSpec) *DataDownloadBuilder { + d.object.Spec.TargetVolume = targetVolume + return d +} + +// Cancel sets the DataDownload's Cancel. +func (d *DataDownloadBuilder) Cancel(cancel bool) *DataDownloadBuilder { + d.object.Spec.Cancel = cancel + return d +} + +// OperationTimeout sets the DataDownload's OperationTimeout. +func (d *DataDownloadBuilder) OperationTimeout(timeout metav1.Duration) *DataDownloadBuilder { + d.object.Spec.OperationTimeout = timeout + return d +} + +// DataMoverConfig sets the DataDownload's DataMoverConfig. +func (d *DataDownloadBuilder) DataMoverConfig(config *map[string]string) *DataDownloadBuilder { + d.object.Spec.DataMoverConfig = *config + return d +} diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 5e138b250c..3f635602ed 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -260,6 +260,10 @@ func (s *nodeAgentServer) run() { s.logger.WithError(err).Fatal("Unable to create the data upload controller") } + if err = controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, repoEnsurer, credentialGetter, s.nodeName, s.logger).SetupWithManager(s.mgr); err != nil { + s.logger.WithError(err).Fatal("Unable to create the data download controller") + } + s.logger.Info("Controllers starting...") if err := s.mgr.Start(ctrl.SetupSignalHandler()); err != nil { diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go new file mode 100644 index 0000000000..1f0e78c69b --- /dev/null +++ b/pkg/controller/data_download_controller.go @@ -0,0 +1,487 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + "k8s.io/utils/clock" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/vmware-tanzu/velero/internal/credentials" + "github.com/vmware-tanzu/velero/pkg/apis/velero/shared" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" + datamover "github.com/vmware-tanzu/velero/pkg/datamover" + "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/exposer" + repository "github.com/vmware-tanzu/velero/pkg/repository" + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" + "github.com/vmware-tanzu/velero/pkg/util/kube" +) + +// DataDownloadReconciler reconciles a DataDownload object +type DataDownloadReconciler struct { + client client.Client + kubeClient kubernetes.Interface + logger logrus.FieldLogger + credentialGetter *credentials.CredentialGetter + fileSystem filesystem.Interface + clock clock.WithTickerAndDelayedExecution + restoreExposer exposer.GenericRestoreExposer + nodeName string + repositoryEnsurer *repository.Ensurer + dataPathMgr *datapath.Manager +} + +func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface, + repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, logger logrus.FieldLogger) *DataDownloadReconciler { + return &DataDownloadReconciler{ + client: client, + kubeClient: kubeClient, + logger: logger.WithField("controller", "DataDownload"), + credentialGetter: credentialGetter, + fileSystem: filesystem.NewFileSystem(), + clock: &clock.RealClock{}, + nodeName: nodeName, + repositoryEnsurer: repoEnsurer, + restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), + dataPathMgr: datapath.NewManager(1), + } +} + +// +kubebuilder:rbac:groups=velero.io,resources=datadownloads,verbs=get;list;watch;create;update;patch;delete +// +kubebuilder:rbac:groups=velero.io,resources=datadownloads/status,verbs=get;update;patch +// +kubebuilder:rbac:groups="",resources=pods,verbs=get +// +kubebuilder:rbac:groups="",resources=persistentvolumes,verbs=get +// +kubebuilder:rbac:groups="",resources=persistentvolumerclaims,verbs=get + +func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := r.logger.WithFields(logrus.Fields{ + "controller": "datadownload", + "datadownload": req.NamespacedName, + }) + + log.Infof("Reconcile %s", req.Name) + + dd := &velerov2alpha1api.DataDownload{} + if err := r.client.Get(ctx, types.NamespacedName{Namespace: req.Namespace, Name: req.Name}, dd); err != nil { + if apierrors.IsNotFound(err) { + log.Warn("DataDownload not found, skip") + return ctrl.Result{}, nil + } + log.WithError(err).Error("Unable to get the DataDownload") + return ctrl.Result{}, err + } + + if dd.Spec.DataMover != "" && dd.Spec.DataMover != dataMoverType { + log.WithField("data mover", dd.Spec.DataMover).Info("it is not one built-in data mover which is not supported by Velero") + return ctrl.Result{}, nil + } + + if r.restoreExposer == nil { + return r.errorOut(ctx, dd, errors.New("uninitialized generic exposer"), "uninitialized exposer", log) + } + + if dd.Status.Phase == "" || dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseNew { + log.Info("Data download starting") + + if _, err := r.getTargetPVC(ctx, dd); err != nil { + return ctrl.Result{Requeue: true}, nil + } + + accepted, err := r.acceptDataDownload(ctx, dd) + if err != nil { + return r.errorOut(ctx, dd, err, "error to accept the data download", log) + } + + if !accepted { + log.Debug("Data download is not accepted") + return ctrl.Result{}, nil + } + + log.Info("Data download is accepted") + + hostingPodLabels := map[string]string{velerov1api.DataDownloadLabel: dd.Name} + + // ep.Expose() will trigger to create one pod whose volume is restored by a given volume snapshot, + // but the pod maybe is not in the same node of the current controller, so we need to return it here. + // And then only the controller who is in the same node could do the rest work. + err = r.restoreExposer.Expose(ctx, getDataDownloadOwnerObject(dd), dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, hostingPodLabels, dd.Spec.OperationTimeout.Duration) + if err != nil { + return r.errorOut(ctx, dd, err, "error to start restore expose", log) + } + log.Info("Restore is exposed") + + return ctrl.Result{}, nil + } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared { + log.Info("Data download is prepared") + fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name) + + if fsRestore != nil { + log.Info("Cancellable data path is already started") + return ctrl.Result{}, nil + } + + result, err := r.restoreExposer.GetExposed(ctx, getDataDownloadOwnerObject(dd), r.client, r.nodeName, dd.Spec.OperationTimeout.Duration) + if err != nil { + return r.errorOut(ctx, dd, err, "restore exposer is not ready", log) + } else if result == nil { + log.Debug("Get empty restore exposer") + return ctrl.Result{}, nil + } + + log.Info("Restore PVC is ready") + + // Update status to InProgress + original := dd.DeepCopy() + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress + dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("Unable to update status to in progress") + return ctrl.Result{}, err + } + + log.Info("Data download is marked as in progress") + + return r.runCancelableDataPath(ctx, dd, result, log) + } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { + log.Info("Data download is in progress") + if dd.Spec.Cancel { + fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name) + if fsRestore == nil { + return ctrl.Result{}, nil + } + + log.Info("Data download is being canceled") + // Update status to Canceling. + original := dd.DeepCopy() + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceling + if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error updating data download status") + return ctrl.Result{}, err + } + + fsRestore.Cancel() + return ctrl.Result{}, nil + } + + return ctrl.Result{}, nil + } else { + log.Debugf("Data download now is in %s phase and do nothing by current %s controller", dd.Status.Phase, r.nodeName) + return ctrl.Result{}, nil + } +} + +func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { + log.Info("Creating data path routine") + callbacks := datapath.Callbacks{ + OnCompleted: r.OnDataDownloadCompleted, + OnFailed: r.OnDataDownloadFailed, + OnCancelled: r.OnDataDownloadCancelled, + OnProgress: r.OnDataDownloadProgress, + } + + fsRestore, err := r.dataPathMgr.CreateFileSystemBR(dd.Name, dataUploadDownloadRequestor, ctx, r.client, dd.Namespace, callbacks, log) + if err != nil { + if err == datapath.ConcurrentLimitExceed { + log.Info("runCancelableDataDownload is concurrent limited") + return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil + } else { + return r.errorOut(ctx, dd, err, "error to create data path", log) + } + } + + path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.PVC, r.client, r.fileSystem, log) + if err != nil { + return r.errorOut(ctx, dd, err, "error exposing host path for pod volume", log) + } + + log.WithField("path", path.ByPath).Debug("Found host path") + if err := fsRestore.Init(ctx, dd.Spec.BackupStorageLocation, dd.Spec.SourceNamespace, datamover.GetUploaderType(dd.Spec.DataMover), + velerov1api.BackupRepositoryTypeKopia, "", r.repositoryEnsurer, r.credentialGetter); err != nil { + return r.errorOut(ctx, dd, err, "error to initialize data path", log) + } + log.WithField("path", path.ByPath).Info("fs init") + + if err := fsRestore.StartRestore(dd.Spec.SnapshotID, path); err != nil { + return r.errorOut(ctx, dd, err, fmt.Sprintf("error starting data path %s restore", path.ByPath), log) + } + + log.WithField("path", path.ByPath).Info("Async fs restore data path started") + return ctrl.Result{}, nil +} + +func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) { + defer r.closeDataPath(ctx, ddName) + + log := r.logger.WithField("datadownload", ddName) + log.Info("Async fs restore data path completed") + + var dd velerov2alpha1api.DataDownload + if err := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); err != nil { + log.WithError(err).Warn("Failed to get datadownload on completion") + return + } + + objRef := getDataDownloadOwnerObject(&dd) + err := r.restoreExposer.RebindVolume(ctx, objRef, dd.Spec.TargetVolume.PVC, dd.Spec.TargetVolume.Namespace, dd.Spec.OperationTimeout.Duration) + if err != nil { + log.WithError(err).Error("Failed to rebind PV to target PVC on completion") + return + } + + log.Info("Cleaning up exposed environment") + r.restoreExposer.CleanUp(ctx, objRef) + + original := dd.DeepCopy() + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCompleted + dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error updating data download status") + } else { + log.Infof("Data download is marked as %s", dd.Status.Phase) + } +} + +func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) { + defer r.closeDataPath(ctx, ddName) + + log := r.logger.WithField("datadownload", ddName) + + log.WithError(err).Error("Async fs restore data path failed") + + var dd velerov2alpha1api.DataDownload + if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil { + log.WithError(getErr).Warn("Failed to get data download on failure") + } else { + if _, errOut := r.errorOut(ctx, &dd, err, "data path restore failed", log); err != nil { + log.WithError(err).Warnf("Failed to patch data download with err %v", errOut) + } + } +} + +func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) { + defer r.closeDataPath(ctx, ddName) + + log := r.logger.WithField("datadownload", ddName) + + log.Warn("Async fs backup data path canceled") + + var dd velerov2alpha1api.DataDownload + if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil { + log.WithError(getErr).Warn("Failed to get datadownload on cancel") + } else { + // cleans up any objects generated during the snapshot expose + r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(&dd)) + + original := dd.DeepCopy() + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled + if dd.Status.StartTimestamp.IsZero() { + dd.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} + } + dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error updating data download status") + } + } +} + +func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) { + log := r.logger.WithField("datadownload", ddName) + + var dd velerov2alpha1api.DataDownload + if err := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); err != nil { + log.WithError(err).Warn("Failed to get data download on progress") + return + } + + original := dd.DeepCopy() + dd.Status.Progress = shared.DataMoveOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone} + + if err := r.client.Patch(ctx, &dd, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("Failed to update restore snapshot progress") + } +} + +// SetupWithManager registers the DataDownload controller. +// The fresh new DataDownload CR first created will trigger to create one pod (long time, maybe failure or unknown status) by one of the datadownload controllers +// then the request will get out of the Reconcile queue immediately by not blocking others' CR handling, in order to finish the rest data download process we need to +// re-enqueue the previous related request once the related pod is in running status to keep going on the rest logic. and below logic will avoid handling the unwanted +// pod status and also avoid block others CR handling +func (r *DataDownloadReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&velerov2alpha1api.DataDownload{}). + Watches(&source.Kind{Type: &v1.Pod{}}, kube.EnqueueRequestsFromMapUpdateFunc(r.findSnapshotRestoreForPod), + builder.WithPredicates(predicate.Funcs{ + UpdateFunc: func(ue event.UpdateEvent) bool { + newObj := ue.ObjectNew.(*v1.Pod) + + if _, ok := newObj.Labels[velerov1api.DataDownloadLabel]; !ok { + return false + } + + if newObj.Status.Phase != v1.PodRunning { + return false + } + + if newObj.Spec.NodeName == "" { + return false + } + + return true + }, + CreateFunc: func(event.CreateEvent) bool { + return false + }, + DeleteFunc: func(de event.DeleteEvent) bool { + return false + }, + GenericFunc: func(ge event.GenericEvent) bool { + return false + }, + })). + Complete(r) +} + +func (r *DataDownloadReconciler) findSnapshotRestoreForPod(podObj client.Object) []reconcile.Request { + pod := podObj.(*v1.Pod) + + dd := &velerov2alpha1api.DataDownload{} + err := r.client.Get(context.Background(), types.NamespacedName{ + Namespace: pod.Namespace, + Name: pod.Labels[velerov1api.DataDownloadLabel], + }, dd) + + if err != nil { + r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to get DataDownload") + return []reconcile.Request{} + } + + if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted { + return []reconcile.Request{} + } + + requests := make([]reconcile.Request, 1) + + r.logger.WithField("Restore pod", pod.Name).Infof("Preparing data download %s", dd.Name) + err = r.patchDataDownload(context.Background(), dd, prepareDataDownload) + if err != nil { + r.logger.WithField("Restore pod", pod.Name).WithError(err).Error("unable to patch data download") + return []reconcile.Request{} + } + + requests[0] = reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: dd.Namespace, + Name: dd.Name, + }, + } + + return requests +} + +func (r *DataDownloadReconciler) patchDataDownload(ctx context.Context, req *velerov2alpha1api.DataDownload, mutate func(*velerov2alpha1api.DataDownload)) error { + original := req.DeepCopy() + mutate(req) + if err := r.client.Patch(ctx, req, client.MergeFrom(original)); err != nil { + return errors.Wrap(err, "error patching data download") + } + + return nil +} + +func prepareDataDownload(ssb *velerov2alpha1api.DataDownload) { + ssb.Status.Phase = velerov2alpha1api.DataDownloadPhasePrepared +} + +func (r *DataDownloadReconciler) errorOut(ctx context.Context, dd *velerov2alpha1api.DataDownload, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) { + r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) + return ctrl.Result{}, r.updateStatusToFailed(ctx, dd, err, msg, log) +} + +func (r *DataDownloadReconciler) updateStatusToFailed(ctx context.Context, dd *velerov2alpha1api.DataDownload, err error, msg string, log logrus.FieldLogger) error { + log.Infof("update data download status to %v", dd.Status.Phase) + original := dd.DeepCopy() + dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseFailed + dd.Status.Message = errors.WithMessage(err, msg).Error() + dd.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + + if err = r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error updating DataDownload status") + return err + } + + return nil +} + +func (r *DataDownloadReconciler) acceptDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload) (bool, error) { + updated := dd.DeepCopy() + updated.Status.Phase = velerov2alpha1api.DataDownloadPhaseAccepted + + r.logger.Infof("Accepting snapshot restore %s", dd.Name) + // For all data download controller in each node-agent will try to update download CR, and only one controller will success, + // and the success one could handle later logic + err := r.client.Update(ctx, updated) + if err == nil { + return true, nil + } else if apierrors.IsConflict(err) { + r.logger.WithField("DataDownload", dd.Name).Error("This data download restore has been accepted by others") + return false, nil + } else { + return false, err + } +} + +func (r *DataDownloadReconciler) getTargetPVC(ctx context.Context, dd *velerov2alpha1api.DataDownload) (*v1.PersistentVolumeClaim, error) { + return r.kubeClient.CoreV1().PersistentVolumeClaims(dd.Spec.TargetVolume.Namespace).Get(ctx, dd.Spec.TargetVolume.PVC, metav1.GetOptions{}) +} + +func (r *DataDownloadReconciler) closeDataPath(ctx context.Context, ddName string) { + fsBackup := r.dataPathMgr.GetAsyncBR(ddName) + if fsBackup != nil { + fsBackup.Close(ctx) + } + + r.dataPathMgr.RemoveAsyncBR(ddName) +} + +func getDataDownloadOwnerObject(dd *velerov2alpha1api.DataDownload) v1.ObjectReference { + return v1.ObjectReference{ + Kind: dd.Kind, + Namespace: dd.Namespace, + Name: dd.Name, + UID: dd.UID, + APIVersion: dd.APIVersion, + } +} diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go new file mode 100644 index 0000000000..5db1170c77 --- /dev/null +++ b/pkg/controller/data_download_controller_test.go @@ -0,0 +1,211 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "testing" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + clientgofake "k8s.io/client-go/kubernetes/fake" + ctrl "sigs.k8s.io/controller-runtime" + kbclient "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/vmware-tanzu/velero/internal/credentials" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" + "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/exposer" + velerotest "github.com/vmware-tanzu/velero/pkg/test" + + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + datapathmockes "github.com/vmware-tanzu/velero/pkg/datapath/mocks" + exposermockes "github.com/vmware-tanzu/velero/pkg/exposer/mocks" +) + +const dataDownloadName string = "datadownload-1" + +func dataDownloadBuilder() *builder.DataDownloadBuilder { + return builder.ForDataDownload(velerov1api.DefaultNamespace, dataDownloadName). + BackupStorageLocation("bsl-loc"). + DataMover("velero"). + SnapshotID("test-snapshot-id").TargetVolume(velerov2alpha1api.TargetVolumeSpec{ + PV: "test-pv", + PVC: "test-pvc", + Namespace: "test-ns", + }) +} + +func initDataDownloadReconciler(objects []runtime.Object, needError ...bool) (*DataDownloadReconciler, error) { + scheme := runtime.NewScheme() + err := velerov1api.AddToScheme(scheme) + if err != nil { + return nil, err + } + err = velerov2alpha1api.AddToScheme(scheme) + if err != nil { + return nil, err + } + err = corev1.AddToScheme(scheme) + if err != nil { + return nil, err + } + + fakeClient := &FakeClient{ + Client: fake.NewClientBuilder().WithScheme(scheme).Build(), + } + + if len(needError) == 4 { + fakeClient.getError = needError[0] + fakeClient.createError = needError[1] + fakeClient.updateError = needError[2] + fakeClient.patchError = needError[3] + } + + fakeKubeClient := clientgofake.NewSimpleClientset(objects...) + fakeFS := velerotest.NewFakeFileSystem() + pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", "", dataDownloadName) + _, err = fakeFS.Create(pathGlob) + if err != nil { + return nil, err + } + + credentialFileStore, err := credentials.NewNamespacedFileStore( + fakeClient, + velerov1api.DefaultNamespace, + "/tmp/credentials", + fakeFS, + ) + if err != nil { + return nil, err + } + return NewDataDownloadReconciler(fakeClient, fakeKubeClient, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test_node", velerotest.NewLogger()), nil +} + +func TestDataDownloadReconcile(t *testing.T) { + tests := []struct { + name string + dd *velerov2alpha1api.DataDownload + targetPVC *corev1.PersistentVolumeClaim + dataMgr *datapath.Manager + needErrs []bool + isExposeErr bool + isGetExposeErr bool + expectedStatusMsg string + }{ + { + name: "Restore is exposed", + dd: dataDownloadBuilder().Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + }, + { + name: "Get empty restore exposer", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + }, + { + name: "Failed to get restore exposer", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + expectedStatusMsg: "Error to get restore exposer", + isGetExposeErr: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + r, err := initDataDownloadReconciler([]runtime.Object{test.targetPVC}, test.needErrs...) + require.NoError(t, err) + defer func() { + r.client.Delete(ctx, test.dd, &kbclient.DeleteOptions{}) + if test.targetPVC != nil { + r.client.Delete(ctx, test.targetPVC, &kbclient.DeleteOptions{}) + } + }() + + ctx := context.Background() + if test.dd.Namespace == velerov1api.DefaultNamespace { + err = r.client.Create(ctx, test.dd) + require.NoError(t, err) + } + + if test.dataMgr != nil { + r.dataPathMgr = test.dataMgr + } else { + r.dataPathMgr = datapath.NewManager(1) + } + + datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { + return datapathmockes.NewAsyncBR(t) + } + + if test.isExposeErr || test.isGetExposeErr { + r.restoreExposer = func() exposer.GenericRestoreExposer { + ep := exposermockes.NewGenericRestoreExposer(t) + if test.isExposeErr { + ep.On("Expose", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(errors.New("Error to expose restore exposer")) + } + + if test.isGetExposeErr { + ep.On("GetExposed", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("Error to get restore exposer")) + } + + ep.On("CleanUp", mock.Anything, mock.Anything).Return() + return ep + }() + } + + if test.dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { + if fsBR := r.dataPathMgr.GetAsyncBR(test.dd.Name); fsBR == nil { + _, err := r.dataPathMgr.CreateFileSystemBR(test.dd.Name, pVBRRequestor, ctx, r.client, velerov1api.DefaultNamespace, datapath.Callbacks{OnCancelled: r.OnDataDownloadCancelled}, velerotest.NewLogger()) + require.NoError(t, err) + } + } + actualResult, err := r.Reconcile(ctx, ctrl.Request{ + NamespacedName: types.NamespacedName{ + Namespace: velerov1api.DefaultNamespace, + Name: test.dd.Name, + }, + }) + + require.Nil(t, err) + require.NotNil(t, actualResult) + + dd := velerov2alpha1api.DataDownload{} + err = r.client.Get(ctx, kbclient.ObjectKey{ + Name: test.dd.Name, + Namespace: test.dd.Namespace, + }, &dd) + + if test.isGetExposeErr { + assert.Contains(t, dd.Status.Message, test.expectedStatusMsg) + } + require.Nil(t, err) + t.Logf("%s: \n %v \n", test.name, dd) + }) + } +} diff --git a/pkg/datapath/mocks/types.go b/pkg/datapath/mocks/types.go new file mode 100644 index 0000000000..ecf655df01 --- /dev/null +++ b/pkg/datapath/mocks/types.go @@ -0,0 +1,86 @@ +// Code generated by mockery v2.20.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + credentials "github.com/vmware-tanzu/velero/internal/credentials" + datapath "github.com/vmware-tanzu/velero/pkg/datapath" + + mock "github.com/stretchr/testify/mock" + + repository "github.com/vmware-tanzu/velero/pkg/repository" +) + +// AsyncBR is an autogenerated mock type for the AsyncBR type +type AsyncBR struct { + mock.Mock +} + +// Cancel provides a mock function with given fields: +func (_m *AsyncBR) Cancel() { + _m.Called() +} + +// Close provides a mock function with given fields: ctx +func (_m *AsyncBR) Close(ctx context.Context) { + _m.Called(ctx) +} + +// Init provides a mock function with given fields: ctx, bslName, sourceNamespace, uploaderType, repositoryType, repoIdentifier, repositoryEnsurer, credentialGetter +func (_m *AsyncBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error { + ret := _m.Called(ctx, bslName, sourceNamespace, uploaderType, repositoryType, repoIdentifier, repositoryEnsurer, credentialGetter) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, string, string, string, *repository.Ensurer, *credentials.CredentialGetter) error); ok { + r0 = rf(ctx, bslName, sourceNamespace, uploaderType, repositoryType, repoIdentifier, repositoryEnsurer, credentialGetter) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// StartBackup provides a mock function with given fields: source, realSource, parentSnapshot, forceFull, tags +func (_m *AsyncBR) StartBackup(source datapath.AccessPoint, realSource string, parentSnapshot string, forceFull bool, tags map[string]string) error { + ret := _m.Called(source, realSource, parentSnapshot, forceFull, tags) + + var r0 error + if rf, ok := ret.Get(0).(func(datapath.AccessPoint, string, string, bool, map[string]string) error); ok { + r0 = rf(source, realSource, parentSnapshot, forceFull, tags) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// StartRestore provides a mock function with given fields: snapshotID, target +func (_m *AsyncBR) StartRestore(snapshotID string, target datapath.AccessPoint) error { + ret := _m.Called(snapshotID, target) + + var r0 error + if rf, ok := ret.Get(0).(func(string, datapath.AccessPoint) error); ok { + r0 = rf(snapshotID, target) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type mockConstructorTestingTNewAsyncBR interface { + mock.TestingT + Cleanup(func()) +} + +// NewAsyncBR creates a new instance of AsyncBR. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewAsyncBR(t mockConstructorTestingTNewAsyncBR) *AsyncBR { + mock := &AsyncBR{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/exposer/mocks/generic_restore.go b/pkg/exposer/mocks/generic_restore.go new file mode 100644 index 0000000000..a7d20f87c6 --- /dev/null +++ b/pkg/exposer/mocks/generic_restore.go @@ -0,0 +1,96 @@ +// Code generated by mockery v2.20.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + client "sigs.k8s.io/controller-runtime/pkg/client" + + exposer "github.com/vmware-tanzu/velero/pkg/exposer" + + mock "github.com/stretchr/testify/mock" + + time "time" + + v1 "k8s.io/api/core/v1" +) + +// GenericRestoreExposer is an autogenerated mock type for the GenericRestoreExposer type +type GenericRestoreExposer struct { + mock.Mock +} + +// CleanUp provides a mock function with given fields: _a0, _a1 +func (_m *GenericRestoreExposer) CleanUp(_a0 context.Context, _a1 v1.ObjectReference) { + _m.Called(_a0, _a1) +} + +// Expose provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4, _a5 +func (_m *GenericRestoreExposer) Expose(_a0 context.Context, _a1 v1.ObjectReference, _a2 string, _a3 string, _a4 map[string]string, _a5 time.Duration) error { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4, _a5) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, string, string, map[string]string, time.Duration) error); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4, _a5) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetExposed provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 +func (_m *GenericRestoreExposer) GetExposed(_a0 context.Context, _a1 v1.ObjectReference, _a2 client.Client, _a3 string, _a4 time.Duration) (*exposer.ExposeResult, error) { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4) + + var r0 *exposer.ExposeResult + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, client.Client, string, time.Duration) (*exposer.ExposeResult, error)); ok { + return rf(_a0, _a1, _a2, _a3, _a4) + } + if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, client.Client, string, time.Duration) *exposer.ExposeResult); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*exposer.ExposeResult) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, v1.ObjectReference, client.Client, string, time.Duration) error); ok { + r1 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RebindVolume provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 +func (_m *GenericRestoreExposer) RebindVolume(_a0 context.Context, _a1 v1.ObjectReference, _a2 string, _a3 string, _a4 time.Duration) error { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, v1.ObjectReference, string, string, time.Duration) error); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type mockConstructorTestingTNewGenericRestoreExposer interface { + mock.TestingT + Cleanup(func()) +} + +// NewGenericRestoreExposer creates a new instance of GenericRestoreExposer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewGenericRestoreExposer(t mockConstructorTestingTNewGenericRestoreExposer) *GenericRestoreExposer { + mock := &GenericRestoreExposer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}