From 623da51494fb83819552bd7257c7668bcde3c489 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Thu, 27 Apr 2023 11:19:03 +0800 Subject: [PATCH 1/2] add shared generic data path Signed-off-by: Lyndon-Li --- changelogs/unreleased/6226-Lyndon-Li | 1 + pkg/cmd/cli/nodeagent/server.go | 55 ++-- .../pod_volume_backup_controller.go | 259 +++++++++++------- .../pod_volume_backup_controller_test.go | 103 ++++--- .../pod_volume_restore_controller.go | 238 +++++++++------- pkg/datapath/file_system.go | 195 +++++++++++++ pkg/datapath/file_system_test.go | 197 +++++++++++++ pkg/datapath/manager.go | 77 ++++++ pkg/datapath/manager_test.go | 52 ++++ pkg/datapath/types.go | 74 +++++ pkg/exposer/host_path.go | 61 +++++ pkg/exposer/host_path_test.go | 81 ++++++ pkg/uploader/kopia/snapshot.go | 46 +++- pkg/uploader/kopia/snapshot_test.go | 2 +- pkg/uploader/provider/kopia.go | 55 ++-- pkg/uploader/provider/kopia_test.go | 10 +- pkg/uploader/provider/mocks/Provider.go | 90 ++++++ pkg/uploader/provider/provider.go | 24 +- pkg/uploader/provider/restic.go | 1 + pkg/uploader/provider/restic_test.go | 2 +- pkg/uploader/types.go | 5 +- pkg/util/kube/utils.go | 4 +- 22 files changed, 1317 insertions(+), 315 deletions(-) create mode 100644 changelogs/unreleased/6226-Lyndon-Li create mode 100644 pkg/datapath/file_system.go create mode 100644 pkg/datapath/file_system_test.go create mode 100644 pkg/datapath/manager.go create mode 100644 pkg/datapath/manager_test.go create mode 100644 pkg/datapath/types.go create mode 100644 pkg/exposer/host_path.go create mode 100644 pkg/exposer/host_path_test.go create mode 100644 pkg/uploader/provider/mocks/Provider.go diff --git a/changelogs/unreleased/6226-Lyndon-Li b/changelogs/unreleased/6226-Lyndon-Li new file mode 100644 index 0000000000..7794b79d0f --- /dev/null +++ b/changelogs/unreleased/6226-Lyndon-Li @@ -0,0 +1 @@ +Add code change for async generic data path that is used by both PVB/PVR and data mover \ No newline at end of file diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 6b3eed3b03..df94707596 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -36,7 +36,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/kubernetes" - clocks "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -51,6 +50,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/cmd/util/signals" "github.com/vmware-tanzu/velero/pkg/controller" "github.com/vmware-tanzu/velero/pkg/metrics" + "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/logging" ) @@ -66,11 +66,22 @@ const ( // defaultCredentialsDirectory is the path on disk where credential // files will be written to defaultCredentialsDirectory = "/tmp/credentials" + + defaultResourceTimeout = 10 * time.Minute ) +type nodeAgentServerConfig struct { + metricsAddress string + resourceTimeout time.Duration +} + func NewServerCommand(f client.Factory) *cobra.Command { logLevelFlag := logging.LogLevelFlag(logrus.InfoLevel) formatFlag := logging.NewFormatFlag() + config := nodeAgentServerConfig{ + metricsAddress: defaultMetricsAddress, + resourceTimeout: defaultResourceTimeout, + } command := &cobra.Command{ Use: "server", @@ -85,7 +96,7 @@ func NewServerCommand(f client.Factory) *cobra.Command { logger.Infof("Starting Velero node-agent server %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA()) f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name())) - s, err := newNodeAgentServer(logger, f, defaultMetricsAddress) + s, err := newNodeAgentServer(logger, f, config) cmd.CheckError(err) s.run() @@ -94,6 +105,7 @@ func NewServerCommand(f client.Factory) *cobra.Command { command.Flags().Var(logLevelFlag, "log-level", fmt.Sprintf("The level at which to log. Valid values are %s.", strings.Join(logLevelFlag.AllowedValues(), ", "))) command.Flags().Var(formatFlag, "log-format", fmt.Sprintf("The format for log output. Valid values are %s.", strings.Join(formatFlag.AllowedValues(), ", "))) + command.Flags().DurationVar(&config.resourceTimeout, "resource-timeout", config.resourceTimeout, "How long to wait for resource processes which are not covered by other specific timeout parameters. Default is 10 minutes.") return command } @@ -108,9 +120,10 @@ type nodeAgentServer struct { metricsAddress string namespace string nodeName string + config nodeAgentServerConfig } -func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, metricAddress string) (*nodeAgentServer, error) { +func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, config nodeAgentServerConfig) (*nodeAgentServer, error) { ctx, cancelFunc := context.WithCancel(context.Background()) clientConfig, err := factory.ClientConfig() @@ -145,14 +158,14 @@ func newNodeAgentServer(logger logrus.FieldLogger, factory client.Factory, metri } s := &nodeAgentServer{ - logger: logger, - ctx: ctx, - cancelFunc: cancelFunc, - fileSystem: filesystem.NewFileSystem(), - mgr: mgr, - metricsAddress: metricAddress, - namespace: factory.Namespace(), - nodeName: nodeName, + logger: logger, + ctx: ctx, + cancelFunc: cancelFunc, + fileSystem: filesystem.NewFileSystem(), + mgr: mgr, + config: config, + namespace: factory.Namespace(), + nodeName: nodeName, } // the cache isn't initialized yet when "validatePodVolumesHostPath" is called, the client returned by the manager cannot @@ -208,21 +221,15 @@ func (s *nodeAgentServer) run() { } credentialGetter := &credentials.CredentialGetter{FromFile: credentialFileStore, FromSecret: credSecretStore} - pvbReconciler := controller.PodVolumeBackupReconciler{ - Scheme: s.mgr.GetScheme(), - Client: s.mgr.GetClient(), - Clock: clocks.RealClock{}, - Metrics: s.metrics, - CredentialGetter: credentialGetter, - NodeName: s.nodeName, - FileSystem: filesystem.NewFileSystem(), - Log: s.logger, - } + repoEnsurer := repository.NewEnsurer(s.mgr.GetClient(), s.logger, s.config.resourceTimeout) + pvbReconciler := controller.NewPodVolumeBackupReconciler(s.mgr.GetClient(), repoEnsurer, + credentialGetter, s.nodeName, s.mgr.GetScheme(), s.metrics, s.logger) + if err := pvbReconciler.SetupWithManager(s.mgr); err != nil { s.logger.Fatal(err, "unable to create controller", "controller", controller.PodVolumeBackup) } - if err = controller.NewPodVolumeRestoreReconciler(s.logger, s.mgr.GetClient(), credentialGetter).SetupWithManager(s.mgr); err != nil { + if err = controller.NewPodVolumeRestoreReconciler(s.mgr.GetClient(), repoEnsurer, credentialGetter, s.logger).SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the pod volume restore controller") } @@ -313,7 +320,7 @@ func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) { if err := controller.UpdatePVBStatusToFailed(s.ctx, client, &pvbs.Items[i], fmt.Sprintf("get a podvolumebackup with status %q during the server starting, mark it as %q", velerov1api.PodVolumeBackupPhaseInProgress, velerov1api.PodVolumeBackupPhaseFailed), - time.Now()); err != nil { + time.Now(), s.logger); err != nil { s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumebackup %q", pvb.GetName()) continue } @@ -349,7 +356,7 @@ func (s *nodeAgentServer) markInProgressPVRsFailed(client ctrlclient.Client) { if err := controller.UpdatePVRStatusToFailed(s.ctx, client, &pvrs.Items[i], fmt.Sprintf("get a podvolumerestore with status %q during the server starting, mark it as %q", velerov1api.PodVolumeRestorePhaseInProgress, velerov1api.PodVolumeRestorePhaseFailed), - time.Now()); err != nil { + time.Now(), s.logger); err != nil { s.logger.WithError(errors.WithStack(err)).Errorf("failed to patch podvolumerestore %q", pvr.GetName()) continue } diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index 61efe7c10c..7c57d7bde0 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -27,49 +27,60 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" clocks "k8s.io/utils/clock" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/exposer" "github.com/vmware-tanzu/velero/pkg/metrics" "github.com/vmware-tanzu/velero/pkg/podvolume" "github.com/vmware-tanzu/velero/pkg/repository" - repokey "github.com/vmware-tanzu/velero/pkg/repository/keys" "github.com/vmware-tanzu/velero/pkg/uploader" - "github.com/vmware-tanzu/velero/pkg/uploader/provider" "github.com/vmware-tanzu/velero/pkg/util/filesystem" - "github.com/vmware-tanzu/velero/pkg/util/kube" ) -// NewUploaderProviderFunc is used for unit test to mock function -var NewUploaderProviderFunc = provider.NewUploaderProvider +const pVBRRequestor string = "pod-volume-backup-restore" + +// NewPodVolumeBackupReconciler creates the PodVolumeBackupReconciler instance +func NewPodVolumeBackupReconciler(client client.Client, ensurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, + nodeName string, scheme *runtime.Scheme, metrics *metrics.ServerMetrics, logger logrus.FieldLogger) *PodVolumeBackupReconciler { + return &PodVolumeBackupReconciler{ + Client: client, + logger: logger.WithField("controller", "PodVolumeBackup"), + repositoryEnsurer: ensurer, + credentialGetter: credentialGetter, + nodeName: nodeName, + fileSystem: filesystem.NewFileSystem(), + clock: &clocks.RealClock{}, + scheme: scheme, + metrics: metrics, + dataPathMgr: datapath.NewManager(1), + } +} // PodVolumeBackupReconciler reconciles a PodVolumeBackup object type PodVolumeBackupReconciler struct { - Scheme *runtime.Scheme - Client client.Client - Clock clocks.WithTickerAndDelayedExecution - Metrics *metrics.ServerMetrics - CredentialGetter *credentials.CredentialGetter - NodeName string - FileSystem filesystem.Interface - Log logrus.FieldLogger -} - -type BackupProgressUpdater struct { - PodVolumeBackup *velerov1api.PodVolumeBackup - Log logrus.FieldLogger - Ctx context.Context - Cli client.Client + client.Client + scheme *runtime.Scheme + clock clocks.WithTickerAndDelayedExecution + metrics *metrics.ServerMetrics + credentialGetter *credentials.CredentialGetter + repositoryEnsurer *repository.Ensurer + nodeName string + fileSystem filesystem.Interface + logger logrus.FieldLogger + dataPathMgr *datapath.Manager } // +kubebuilder:rbac:groups=velero.io,resources=podvolumebackups,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=velero.io,resources=podvolumebackups/status,verbs=get;update;patch func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := r.Log.WithFields(logrus.Fields{ + log := r.logger.WithFields(logrus.Fields{ "controller": "podvolumebackup", "podvolumebackup": req.NamespacedName, }) @@ -92,7 +103,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ log.Info("PodVolumeBackup starting") // Only process items for this node. - if pvb.Spec.Node != r.NodeName { + if pvb.Spec.Node != r.nodeName { return ctrl.Result{}, nil } @@ -104,15 +115,30 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, nil } - r.Metrics.RegisterPodVolumeBackupEnqueue(r.NodeName) + callbacks := datapath.Callbacks{ + OnCompleted: r.OnDataPathCompleted, + OnFailed: r.OnDataPathFailed, + OnCancelled: r.OnDataPathCancelled, + OnProgress: r.OnDataPathProgress, + } + + fsBackup, err := r.dataPathMgr.CreateFileSystemBR(pvb.Name, pVBRRequestor, ctx, r.Client, pvb.Namespace, callbacks, log) + if err != nil { + if err == datapath.ConcurrentLimitExceed { + return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil + } else { + return r.errorOut(ctx, &pvb, err, "error to create data path", log) + } + } + + r.metrics.RegisterPodVolumeBackupEnqueue(r.nodeName) // Update status to InProgress. original := pvb.DeepCopy() pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseInProgress - pvb.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} + pvb.Status.StartTimestamp = &metav1.Time{Time: r.clock.Now()} if err := r.Client.Patch(ctx, &pvb, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("error updating PodVolumeBackup status") - return ctrl.Result{}, err + return r.errorOut(ctx, &pvb, err, "error updating PodVolumeBackup status", log) } var pod corev1.Pod @@ -121,45 +147,19 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ Name: pvb.Spec.Pod.Name, } if err := r.Client.Get(ctx, podNamespacedName, &pod); err != nil { - return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("getting pod %s/%s", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name), log) + return r.errorOut(ctx, &pvb, err, fmt.Sprintf("getting pod %s/%s", pvb.Spec.Pod.Namespace, pvb.Spec.Pod.Name), log) } - volDir, err := kube.GetVolumeDirectory(ctx, log, &pod, pvb.Spec.Volume, r.Client) + path, err := exposer.GetPodVolumeHostPath(ctx, &pod, pvb.Spec.Volume, r.Client, r.fileSystem, log) if err != nil { - return r.updateStatusToFailed(ctx, &pvb, err, "getting volume directory name", log) + return r.errorOut(ctx, &pvb, err, "error exposing host path for pod volume", log) } - pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(pvb.Spec.Pod.UID), volDir) - log.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob") - - path, err := kube.SinglePathMatch(pathGlob, r.FileSystem, log) - if err != nil { - return r.updateStatusToFailed(ctx, &pvb, err, "identifying unique volume path on host", log) - } - log.WithField("path", path).Debugf("Found path matching glob") - - backupLocation := &velerov1api.BackupStorageLocation{} - if err := r.Client.Get(context.Background(), client.ObjectKey{ - Namespace: pvb.Namespace, - Name: pvb.Spec.BackupStorageLocation, - }, backupLocation); err != nil { - return ctrl.Result{}, errors.Wrap(err, "error getting backup storage location") - } - - backupRepo, err := repository.GetBackupRepository(ctx, r.Client, pvb.Namespace, repository.BackupRepositoryKey{ - VolumeNamespace: pvb.Spec.Pod.Namespace, - BackupLocation: pvb.Spec.BackupStorageLocation, - RepositoryType: podvolume.GetPvbRepositoryType(&pvb), - }) - if err != nil { - return ctrl.Result{}, errors.Wrap(err, "error getting backup repository") - } + log.WithField("path", path.ByPath).Debugf("Found host path") - var uploaderProv provider.Provider - uploaderProv, err = NewUploaderProviderFunc(ctx, r.Client, pvb.Spec.UploaderType, pvb.Spec.RepoIdentifier, - backupLocation, backupRepo, r.CredentialGetter, repokey.RepoKeySelector(), log) - if err != nil { - return r.updateStatusToFailed(ctx, &pvb, err, "error creating uploader", log) + if err := fsBackup.Init(ctx, pvb.Spec.BackupStorageLocation, pvb.Spec.Pod.Namespace, pvb.Spec.UploaderType, + podvolume.GetPvbRepositoryType(&pvb), r.repositoryEnsurer, r.credentialGetter); err != nil { + return r.errorOut(ctx, &pvb, err, "error to initialize data path", log) } // If this is a PVC, look for the most recent completed pod volume backup for it and get @@ -177,41 +177,98 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ } } - defer func() { - if err := uploaderProv.Close(ctx); err != nil { - log.Errorf("failed to close uploader provider with error %v", err) - } - }() + if err := fsBackup.StartBackup(path, parentSnapshotID, false, pvb.Spec.Tags); err != nil { + return r.errorOut(ctx, &pvb, err, "error starting data path backup", log) + } - snapshotID, emptySnapshot, err := uploaderProv.RunBackup(ctx, path, pvb.Spec.Tags, parentSnapshotID, r.NewBackupProgressUpdater(ctx, &pvb, log)) - if err != nil { - return r.updateStatusToFailed(ctx, &pvb, err, fmt.Sprintf("running backup, stderr=%v", err), log) + log.WithField("path", path.ByPath).Info("Async fs backup data path started") + + return ctrl.Result{}, nil +} + +func (r *PodVolumeBackupReconciler) OnDataPathCompleted(ctx context.Context, namespace string, pvbName string, result datapath.Result) { + defer r.closeDataPath(ctx, pvbName) + + log := r.logger.WithField("pvb", pvbName) + + log.WithField("PVB", pvbName).Info("Async fs backup data path completed") + + var pvb velerov1api.PodVolumeBackup + if err := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); err != nil { + log.WithError(err).Warn("Failed to get PVB on completion") + return } // Update status to Completed with path & snapshot ID. - original = pvb.DeepCopy() - pvb.Status.Path = path + original := pvb.DeepCopy() + pvb.Status.Path = result.Backup.Source.ByPath pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseCompleted - pvb.Status.SnapshotID = snapshotID - pvb.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} - if emptySnapshot { + pvb.Status.SnapshotID = result.Backup.SnapshotID + pvb.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now()} + if result.Backup.EmptySnapshot { pvb.Status.Message = "volume was empty so no snapshot was taken" } - if err = r.Client.Patch(ctx, &pvb, client.MergeFrom(original)); err != nil { + + if err := r.Client.Patch(ctx, &pvb, client.MergeFrom(original)); err != nil { log.WithError(err).Error("error updating PodVolumeBackup status") - return ctrl.Result{}, err } latencyDuration := pvb.Status.CompletionTimestamp.Time.Sub(pvb.Status.StartTimestamp.Time) latencySeconds := float64(latencyDuration / time.Second) - backupName := fmt.Sprintf("%s/%s", req.Namespace, pvb.OwnerReferences[0].Name) - generateOpName := fmt.Sprintf("%s-%s-%s-%s-%s-backup", pvb.Name, backupRepo.Name, pvb.Spec.BackupStorageLocation, pvb.Namespace, pvb.Spec.UploaderType) - r.Metrics.ObservePodVolumeOpLatency(r.NodeName, req.Name, generateOpName, backupName, latencySeconds) - r.Metrics.RegisterPodVolumeOpLatencyGauge(r.NodeName, req.Name, generateOpName, backupName, latencySeconds) - r.Metrics.RegisterPodVolumeBackupDequeue(r.NodeName) + backupName := fmt.Sprintf("%s/%s", pvb.Namespace, pvb.OwnerReferences[0].Name) + generateOpName := fmt.Sprintf("%s-%s-%s-%s-backup", pvb.Name, pvb.Spec.BackupStorageLocation, pvb.Spec.Pod.Namespace, pvb.Spec.UploaderType) + r.metrics.ObservePodVolumeOpLatency(r.nodeName, pvb.Name, generateOpName, backupName, latencySeconds) + r.metrics.RegisterPodVolumeOpLatencyGauge(r.nodeName, pvb.Name, generateOpName, backupName, latencySeconds) + r.metrics.RegisterPodVolumeBackupDequeue(r.nodeName) log.Info("PodVolumeBackup completed") - return ctrl.Result{}, nil +} + +func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namespace string, pvbName string, err error) { + defer r.closeDataPath(ctx, pvbName) + + log := r.logger.WithField("pvb", pvbName) + + log.WithError(err).Error("Async fs backup data path failed") + + var pvb velerov1api.PodVolumeBackup + if err := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); err != nil { + log.WithError(err).Warn("Failed to get PVB on failure") + } else { + _, _ = r.errorOut(ctx, &pvb, err, "data path backup failed", log) + } +} + +func (r *PodVolumeBackupReconciler) OnDataPathCancelled(ctx context.Context, namespace string, pvbName string) { + defer r.closeDataPath(ctx, pvbName) + + log := r.logger.WithField("pvb", pvbName) + + log.Warn("Async fs backup data path canceled") + + var pvb velerov1api.PodVolumeBackup + if err := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); err != nil { + log.WithError(err).Warn("Failed to get PVB on cancel") + } else { + _, _ = r.errorOut(ctx, &pvb, err, "data path backup canceled", log) + } +} + +func (r *PodVolumeBackupReconciler) OnDataPathProgress(ctx context.Context, namespace string, pvbName string, progress *uploader.Progress) { + log := r.logger.WithField("pvb", pvbName) + + var pvb velerov1api.PodVolumeBackup + if err := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); err != nil { + log.WithError(err).Warn("Failed to get PVB on progress") + return + } + + original := pvb.DeepCopy() + pvb.Status.Progress = velerov1api.PodVolumeOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone} + + if err := r.Client.Patch(ctx, &pvb, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("Failed to update progress") + } } // SetupWithManager registers the PVB controller. @@ -279,36 +336,32 @@ func (r *PodVolumeBackupReconciler) getParentSnapshot(ctx context.Context, log l return mostRecentPVB.Status.SnapshotID } -func (r *PodVolumeBackupReconciler) updateStatusToFailed(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) { - if err = UpdatePVBStatusToFailed(ctx, r.Client, pvb, errors.WithMessage(err, msg).Error(), r.Clock.Now()); err != nil { - log.WithError(err).Error("error updating PodVolumeBackup status") - return ctrl.Result{}, err +func (r *PodVolumeBackupReconciler) closeDataPath(ctx context.Context, pvbName string) { + fsBackup := r.dataPathMgr.GetAsyncBR(pvbName) + if fsBackup != nil { + fsBackup.Close(ctx) } - return ctrl.Result{}, nil + + r.dataPathMgr.RemoveAsyncBR(pvbName) +} + +func (r *PodVolumeBackupReconciler) errorOut(ctx context.Context, pvb *velerov1api.PodVolumeBackup, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) { + r.closeDataPath(ctx, pvb.Name) + _ = UpdatePVBStatusToFailed(ctx, r.Client, pvb, errors.WithMessage(err, msg).Error(), r.clock.Now(), log) + + return ctrl.Result{}, err } -func UpdatePVBStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1api.PodVolumeBackup, errString string, time time.Time) error { +func UpdatePVBStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1api.PodVolumeBackup, errString string, time time.Time, log logrus.FieldLogger) error { original := pvb.DeepCopy() pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseFailed pvb.Status.Message = errString pvb.Status.CompletionTimestamp = &metav1.Time{Time: time} - return c.Patch(ctx, pvb, client.MergeFrom(original)) -} - -func (r *PodVolumeBackupReconciler) NewBackupProgressUpdater(ctx context.Context, pvb *velerov1api.PodVolumeBackup, log logrus.FieldLogger) *BackupProgressUpdater { - return &BackupProgressUpdater{pvb, log, ctx, r.Client} -} - -// UpdateProgress which implement ProgressUpdater interface to update pvb progress status -func (b *BackupProgressUpdater) UpdateProgress(p *uploader.Progress) { - original := b.PodVolumeBackup.DeepCopy() - b.PodVolumeBackup.Status.Progress = velerov1api.PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone} - if b.Cli == nil { - b.Log.Errorf("failed to update backup pod %s volume %s progress with uninitailize client", b.PodVolumeBackup.Spec.Pod.Name, b.PodVolumeBackup.Spec.Volume) - return - } - if err := b.Cli.Patch(b.Ctx, b.PodVolumeBackup, client.MergeFrom(original)); err != nil { - b.Log.Errorf("update backup pod %s volume %s progress with %v", b.PodVolumeBackup.Spec.Pod.Name, b.PodVolumeBackup.Spec.Volume, err) + if err := c.Patch(ctx, pvb, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error updating PodVolumeBackup status") + return err + } else { + return nil } } diff --git a/pkg/controller/pod_volume_backup_controller_test.go b/pkg/controller/pod_volume_backup_controller_test.go index f940f67e3f..ede8aa9d14 100644 --- a/pkg/controller/pod_volume_backup_controller_test.go +++ b/pkg/controller/pod_volume_backup_controller_test.go @@ -30,6 +30,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/clock" testclocks "k8s.io/utils/clock/testing" ctrl "sigs.k8s.io/controller-runtime" kbclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -38,10 +39,10 @@ import ( "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/builder" + "github.com/vmware-tanzu/velero/pkg/datapath" "github.com/vmware-tanzu/velero/pkg/metrics" + "github.com/vmware-tanzu/velero/pkg/repository" velerotest "github.com/vmware-tanzu/velero/pkg/test" - "github.com/vmware-tanzu/velero/pkg/uploader" - "github.com/vmware-tanzu/velero/pkg/uploader/provider" ) const name = "pvb-1" @@ -92,6 +93,38 @@ func buildBackupRepo() *velerov1api.BackupRepository { } } +type fakeFSBR struct { + pvb *velerov1api.PodVolumeBackup + client kbclient.Client + clock clock.WithTickerAndDelayedExecution +} + +func (b *fakeFSBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error { + return nil +} + +func (b *fakeFSBR) StartBackup(source datapath.AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error { + pvb := b.pvb + + original := b.pvb.DeepCopy() + pvb.Status.Phase = velerov1api.PodVolumeBackupPhaseCompleted + pvb.Status.CompletionTimestamp = &metav1.Time{Time: b.clock.Now()} + + b.client.Patch(ctx, pvb, kbclient.MergeFrom(original)) + + return nil +} + +func (b *fakeFSBR) StartRestore(snapshotID string, target datapath.AccessPoint) error { + return nil +} + +func (b *fakeFSBR) Cancel() { +} + +func (b *fakeFSBR) Close(ctx context.Context) { +} + var _ = Describe("PodVolumeBackup Reconciler", func() { type request struct { pvb *velerov1api.PodVolumeBackup @@ -102,6 +135,7 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { expected *velerov1api.PodVolumeBackup expectedRequeue ctrl.Result expectedErrMsg string + dataMgr *datapath.Manager } // `now` will be used to set the fake clock's time; capture @@ -141,20 +175,31 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Expect(err).To(BeNil()) + if test.dataMgr == nil { + test.dataMgr = datapath.NewManager(1) + } + + datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { + return &fakeFSBR{ + pvb: test.pvb, + client: fakeClient, + clock: testclocks.NewFakeClock(now), + } + } + // Setup reconciler Expect(velerov1api.AddToScheme(scheme.Scheme)).To(Succeed()) r := PodVolumeBackupReconciler{ Client: fakeClient, - Clock: testclocks.NewFakeClock(now), - Metrics: metrics.NewPodVolumeMetrics(), - CredentialGetter: &credentials.CredentialGetter{FromFile: credentialFileStore}, - NodeName: "test_node", - FileSystem: fakeFS, - Log: velerotest.NewLogger(), - } - NewUploaderProviderFunc = func(ctx context.Context, client kbclient.Client, uploaderType, repoIdentifier string, bsl *velerov1api.BackupStorageLocation, backupRepo *velerov1api.BackupRepository, credGetter *credentials.CredentialGetter, repoKeySelector *corev1.SecretKeySelector, log logrus.FieldLogger) (provider.Provider, error) { - return &fakeProvider{}, nil + clock: testclocks.NewFakeClock(now), + metrics: metrics.NewPodVolumeMetrics(), + credentialGetter: &credentials.CredentialGetter{FromFile: credentialFileStore}, + nodeName: "test_node", + fileSystem: fakeFS, + logger: velerotest.NewLogger(), + dataPathMgr: test.dataMgr, } + actualResult, err := r.Reconcile(ctx, ctrl.Request{ NamespacedName: types.NamespacedName{ Namespace: velerov1api.DefaultNamespace, @@ -328,29 +373,17 @@ var _ = Describe("PodVolumeBackup Reconciler", func() { Result(), expectedRequeue: ctrl.Result{}, }), + Entry("pvb should be requeued when exceeding max concurrent number", request{ + pvb: pvbBuilder().Phase("").Node("test_node").Result(), + pod: podBuilder().Result(), + bsl: bslBuilder().Result(), + backupRepo: buildBackupRepo(), + dataMgr: datapath.NewManager(0), + expectedProcessed: false, + expected: builder.ForPodVolumeBackup(velerov1api.DefaultNamespace, "pvb-1"). + Phase(""). + Result(), + expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, + }), ) }) - -type fakeProvider struct { -} - -func (f *fakeProvider) RunBackup( - ctx context.Context, - path string, - tags map[string]string, - parentSnapshot string, - updater uploader.ProgressUpdater) (string, bool, error) { - return "", false, nil -} - -func (f *fakeProvider) RunRestore( - ctx context.Context, - snapshotID string, - volumePath string, - updater uploader.ProgressUpdater) error { - return nil -} - -func (f *fakeProvider) Close(ctx context.Context) error { - return nil -} diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index 5a8e380fc0..3d1b707d12 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -39,40 +39,37 @@ import ( "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/exposer" "github.com/vmware-tanzu/velero/pkg/podvolume" "github.com/vmware-tanzu/velero/pkg/repository" - repokey "github.com/vmware-tanzu/velero/pkg/repository/keys" "github.com/vmware-tanzu/velero/pkg/restorehelper" "github.com/vmware-tanzu/velero/pkg/uploader" - "github.com/vmware-tanzu/velero/pkg/uploader/provider" "github.com/vmware-tanzu/velero/pkg/util/boolptr" "github.com/vmware-tanzu/velero/pkg/util/filesystem" - "github.com/vmware-tanzu/velero/pkg/util/kube" ) -func NewPodVolumeRestoreReconciler(logger logrus.FieldLogger, client client.Client, credentialGetter *credentials.CredentialGetter) *PodVolumeRestoreReconciler { +func NewPodVolumeRestoreReconciler(client client.Client, ensurer *repository.Ensurer, + credentialGetter *credentials.CredentialGetter, logger logrus.FieldLogger) *PodVolumeRestoreReconciler { return &PodVolumeRestoreReconciler{ - Client: client, - logger: logger.WithField("controller", "PodVolumeRestore"), - credentialGetter: credentialGetter, - fileSystem: filesystem.NewFileSystem(), - clock: &clocks.RealClock{}, + Client: client, + logger: logger.WithField("controller", "PodVolumeRestore"), + repositoryEnsurer: ensurer, + credentialGetter: credentialGetter, + fileSystem: filesystem.NewFileSystem(), + clock: &clocks.RealClock{}, + dataPathMgr: datapath.NewManager(1), } } type PodVolumeRestoreReconciler struct { client.Client - logger logrus.FieldLogger - credentialGetter *credentials.CredentialGetter - fileSystem filesystem.Interface - clock clocks.WithTickerAndDelayedExecution -} - -type RestoreProgressUpdater struct { - PodVolumeRestore *velerov1api.PodVolumeRestore - Log logrus.FieldLogger - Ctx context.Context - Cli client.Client + logger logrus.FieldLogger + repositoryEnsurer *repository.Ensurer + credentialGetter *credentials.CredentialGetter + fileSystem filesystem.Interface + clock clocks.WithTickerAndDelayedExecution + dataPathMgr *datapath.Manager } // +kubebuilder:rbac:groups=velero.io,resources=podvolumerestores,verbs=get;list;watch;create;update;patch;delete @@ -113,41 +110,69 @@ func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req } log.Info("Restore starting") + + callbacks := datapath.Callbacks{ + OnCompleted: c.OnDataPathCompleted, + OnFailed: c.OnDataPathFailed, + OnCancelled: c.OnDataPathCancelled, + OnProgress: c.OnDataPathProgress, + } + + fsRestore, err := c.dataPathMgr.CreateFileSystemBR(pvr.Name, pVBRRequestor, ctx, c.Client, pvr.Namespace, callbacks, log) + if err != nil { + if err == datapath.ConcurrentLimitExceed { + return ctrl.Result{Requeue: true, RequeueAfter: time.Minute}, nil + } else { + return c.errorOut(ctx, pvr, err, "error to create data path", log) + } + } + original := pvr.DeepCopy() pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseInProgress pvr.Status.StartTimestamp = &metav1.Time{Time: c.clock.Now()} if err = c.Patch(ctx, pvr, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("Unable to update status to in progress") - return ctrl.Result{}, err + return c.errorOut(ctx, pvr, err, "error to update status to in progress", log) } - if err = c.processRestore(ctx, pvr, pod, log); err != nil { - if e := UpdatePVRStatusToFailed(ctx, c, pvr, err.Error(), c.clock.Now()); e != nil { - log.WithError(err).Error("Unable to update status to failed") - } + volumePath, err := exposer.GetPodVolumeHostPath(ctx, pod, pvr.Spec.Volume, c.Client, c.fileSystem, log) + if err != nil { + return c.errorOut(ctx, pvr, err, "error exposing host path for pod volume", log) + } - log.WithError(err).Error("Unable to process the PodVolumeRestore") - return ctrl.Result{}, err + log.WithField("path", volumePath.ByPath).Debugf("Found host path") + + if err := fsRestore.Init(ctx, pvr.Spec.BackupStorageLocation, pvr.Spec.Pod.Namespace, pvr.Spec.UploaderType, + podvolume.GetPvrRepositoryType(pvr), c.repositoryEnsurer, c.credentialGetter); err != nil { + return c.errorOut(ctx, pvr, err, "error to initialize data path", log) } - original = pvr.DeepCopy() - pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCompleted - pvr.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()} - if err = c.Patch(ctx, pvr, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("Unable to update status to completed") - return ctrl.Result{}, err + if err := fsRestore.StartRestore(pvr.Spec.SnapshotID, volumePath); err != nil { + return c.errorOut(ctx, pvr, err, "error starting data path restore", log) } - log.Info("Restore completed") + + log.WithField("path", volumePath.ByPath).Info("Async fs restore data path started") + return ctrl.Result{}, nil } -func UpdatePVRStatusToFailed(ctx context.Context, c client.Client, pvr *velerov1api.PodVolumeRestore, errString string, time time.Time) error { - original := pvr.DeepCopy() - pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed - pvr.Status.Message = errString - pvr.Status.CompletionTimestamp = &metav1.Time{Time: time} +func (c *PodVolumeRestoreReconciler) errorOut(ctx context.Context, pvr *velerov1api.PodVolumeRestore, err error, msg string, log logrus.FieldLogger) (ctrl.Result, error) { + c.closeDataPath(ctx, pvr.Name) + _ = UpdatePVRStatusToFailed(ctx, c.Client, pvr, errors.WithMessage(err, msg).Error(), c.clock.Now(), log) + return ctrl.Result{}, err +} - return c.Patch(ctx, pvr, client.MergeFrom(original)) +func UpdatePVRStatusToFailed(ctx context.Context, c client.Client, pvb *velerov1api.PodVolumeRestore, errString string, time time.Time, log logrus.FieldLogger) error { + original := pvb.DeepCopy() + pvb.Status.Phase = velerov1api.PodVolumeRestorePhaseFailed + pvb.Status.Message = errString + pvb.Status.CompletionTimestamp = &metav1.Time{Time: time} + + if err := c.Patch(ctx, pvb, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error updating PodVolumeRestore status") + return err + } else { + return nil + } } func (c *PodVolumeRestoreReconciler) shouldProcess(ctx context.Context, log logrus.FieldLogger, pvr *velerov1api.PodVolumeRestore) (bool, *corev1api.Pod, error) { @@ -232,54 +257,23 @@ func getInitContainerIndex(pod *corev1api.Pod) int { return -1 } -func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *velerov1api.PodVolumeRestore, pod *corev1api.Pod, log logrus.FieldLogger) error { - volumeDir, err := kube.GetVolumeDirectory(ctx, log, pod, req.Spec.Volume, c.Client) - if err != nil { - return errors.Wrap(err, "error getting volume directory name") - } - - // Get the full path of the new volume's directory as mounted in the daemonset pod, which - // will look like: /host_pods//volumes// - volumePath, err := kube.SinglePathMatch( - fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(req.Spec.Pod.UID), volumeDir), - c.fileSystem, log) - if err != nil { - return errors.Wrap(err, "error identifying path of volume") - } +func (c *PodVolumeRestoreReconciler) OnDataPathCompleted(ctx context.Context, namespace string, pvrName string, result datapath.Result) { + defer c.closeDataPath(ctx, pvrName) - backupLocation := &velerov1api.BackupStorageLocation{} - if err := c.Get(ctx, client.ObjectKey{ - Namespace: req.Namespace, - Name: req.Spec.BackupStorageLocation, - }, backupLocation); err != nil { - return errors.Wrap(err, "error getting backup storage location") - } + log := c.logger.WithField("pvr", pvrName) - // need to check backup repository in source namespace rather than in pod namespace - // such as in case of namespace mapping issue - backupRepo, err := repository.GetBackupRepository(ctx, c.Client, req.Namespace, repository.BackupRepositoryKey{ - VolumeNamespace: req.Spec.SourceNamespace, - BackupLocation: req.Spec.BackupStorageLocation, - RepositoryType: podvolume.GetPvrRepositoryType(req), - }) - if err != nil { - return errors.Wrap(err, "error getting backup repository") - } + log.WithField("PVR", pvrName).Info("Async fs restore data path completed") - uploaderProv, err := provider.NewUploaderProvider(ctx, c.Client, req.Spec.UploaderType, - req.Spec.RepoIdentifier, backupLocation, backupRepo, c.credentialGetter, repokey.RepoKeySelector(), log) - if err != nil { - return errors.Wrap(err, "error creating uploader") + var pvr velerov1api.PodVolumeRestore + if err := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil { + log.WithError(err).Warn("Failed to get PVR on completion") + return } - defer func() { - if err := uploaderProv.Close(ctx); err != nil { - log.Errorf("failed to close uploader provider with error %v", err) - } - }() - - if err = uploaderProv.RunRestore(ctx, req.Spec.SnapshotID, volumePath, c.NewRestoreProgressUpdater(ctx, req, log)); err != nil { - return errors.Wrapf(err, "error running restore err=%v", err) + volumePath := result.Restore.Target.ByPath + if volumePath == "" { + _, _ = c.errorOut(ctx, &pvr, errors.New("path is empty"), "invalid restore target", log) + return } // Remove the .velero directory from the restored volume (it may contain done files from previous restores @@ -291,7 +285,7 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve } var restoreUID types.UID - for _, owner := range req.OwnerReferences { + for _, owner := range pvr.OwnerReferences { if boolptr.IsSetToTrue(owner.Controller) { restoreUID = owner.UID break @@ -301,32 +295,80 @@ func (c *PodVolumeRestoreReconciler) processRestore(ctx context.Context, req *ve // Create the .velero directory within the volume dir so we can write a done file // for this restore. if err := os.MkdirAll(filepath.Join(volumePath, ".velero"), 0755); err != nil { - return errors.Wrap(err, "error creating .velero directory for done file") + _, _ = c.errorOut(ctx, &pvr, err, "error creating .velero directory for done file", log) + return } // Write a done file with name= into the just-created .velero dir // within the volume. The velero init container on the pod is waiting // for this file to exist in each restored volume before completing. if err := os.WriteFile(filepath.Join(volumePath, ".velero", string(restoreUID)), nil, 0644); err != nil { //nolint:gosec - return errors.Wrap(err, "error writing done file") + _, _ = c.errorOut(ctx, &pvr, err, "error writing done file", log) + return } - return nil + original := pvr.DeepCopy() + pvr.Status.Phase = velerov1api.PodVolumeRestorePhaseCompleted + pvr.Status.CompletionTimestamp = &metav1.Time{Time: c.clock.Now()} + if err := c.Patch(ctx, &pvr, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("error updating PodVolumeRestore status") + } + + log.Info("Restore completed") +} + +func (c *PodVolumeRestoreReconciler) OnDataPathFailed(ctx context.Context, namespace string, pvrName string, err error) { + defer c.closeDataPath(ctx, pvrName) + + log := c.logger.WithField("pvr", pvrName) + + log.WithError(err).Info("Async fs restore data path failed") + + var pvr velerov1api.PodVolumeRestore + if err := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil { + log.WithError(err).Warn("Failed to get PVR on failure") + } else { + _, _ = c.errorOut(ctx, &pvr, err, "data path restore failed", log) + } } -func (c *PodVolumeRestoreReconciler) NewRestoreProgressUpdater(ctx context.Context, pvr *velerov1api.PodVolumeRestore, log logrus.FieldLogger) *RestoreProgressUpdater { - return &RestoreProgressUpdater{pvr, log, ctx, c.Client} +func (c *PodVolumeRestoreReconciler) OnDataPathCancelled(ctx context.Context, namespace string, pvrName string) { + defer c.closeDataPath(ctx, pvrName) + + log := c.logger.WithField("pvr", pvrName) + + log.Info("Async fs restore data path canceled") + + var pvr velerov1api.PodVolumeRestore + if err := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil { + log.WithError(err).Warn("Failed to get PVR on cancel") + } else { + _, _ = c.errorOut(ctx, &pvr, err, "data path restore canceled", log) + } } -// UpdateProgress which implement ProgressUpdater interface to update pvr progress status -func (c *RestoreProgressUpdater) UpdateProgress(p *uploader.Progress) { - original := c.PodVolumeRestore.DeepCopy() - c.PodVolumeRestore.Status.Progress = velerov1api.PodVolumeOperationProgress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone} - if c.Cli == nil { - c.Log.Errorf("failed to update restore pod %s volume %s progress with uninitailize client", c.PodVolumeRestore.Spec.Pod.Name, c.PodVolumeRestore.Spec.Volume) +func (c *PodVolumeRestoreReconciler) OnDataPathProgress(ctx context.Context, namespace string, pvrName string, progress *uploader.Progress) { + log := c.logger.WithField("pvr", pvrName) + + var pvr velerov1api.PodVolumeRestore + if err := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil { + log.WithError(err).Warn("Failed to get PVB on progress") return } - if err := c.Cli.Patch(c.Ctx, c.PodVolumeRestore, client.MergeFrom(original)); err != nil { - c.Log.Errorf("update restore pod %s volume %s progress with %v", c.PodVolumeRestore.Spec.Pod.Name, c.PodVolumeRestore.Spec.Volume, err) + + original := pvr.DeepCopy() + pvr.Status.Progress = velerov1api.PodVolumeOperationProgress{TotalBytes: progress.TotalBytes, BytesDone: progress.BytesDone} + + if err := c.Client.Patch(ctx, &pvr, client.MergeFrom(original)); err != nil { + log.WithError(err).Error("Failed to update progress") + } +} + +func (c *PodVolumeRestoreReconciler) closeDataPath(ctx context.Context, pvbName string) { + fsRestore := c.dataPathMgr.GetAsyncBR(pvbName) + if fsRestore != nil { + fsRestore.Close(ctx) } + + c.dataPathMgr.RemoveAsyncBR(pvbName) } diff --git a/pkg/datapath/file_system.go b/pkg/datapath/file_system.go new file mode 100644 index 0000000000..6d7d915bb1 --- /dev/null +++ b/pkg/datapath/file_system.go @@ -0,0 +1,195 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datapath + +import ( + "context" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/vmware-tanzu/velero/internal/credentials" + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/repository" + repokey "github.com/vmware-tanzu/velero/pkg/repository/keys" + repoProvider "github.com/vmware-tanzu/velero/pkg/repository/provider" + "github.com/vmware-tanzu/velero/pkg/uploader" + "github.com/vmware-tanzu/velero/pkg/uploader/provider" +) + +type fileSystemBR struct { + ctx context.Context + cancel context.CancelFunc + backupRepo *velerov1api.BackupRepository + uploaderProv provider.Provider + log logrus.FieldLogger + client client.Client + backupLocation *velerov1api.BackupStorageLocation + namespace string + initialized bool + callbacks Callbacks + jobName string + requestorType string +} + +func newFileSystemBR(jobName string, requestorType string, client client.Client, namespace string, callbacks Callbacks, log logrus.FieldLogger) AsyncBR { + fs := &fileSystemBR{ + jobName: jobName, + requestorType: requestorType, + client: client, + namespace: namespace, + callbacks: callbacks, + log: log, + } + + return fs +} + +func (fs *fileSystemBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, + repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error { + var err error + defer func() { + if err != nil { + fs.Close(ctx) + } + }() + + fs.ctx, fs.cancel = context.WithCancel(ctx) + + backupLocation := &velerov1api.BackupStorageLocation{} + if err = fs.client.Get(ctx, client.ObjectKey{ + Namespace: fs.namespace, + Name: bslName, + }, backupLocation); err != nil { + return errors.Wrapf(err, "error getting backup storage location %s", bslName) + } + + fs.backupLocation = backupLocation + + fs.backupRepo, err = repositoryEnsurer.EnsureRepo(ctx, fs.namespace, sourceNamespace, bslName, repositoryType) + if err != nil { + return errors.Wrapf(err, "error to ensure backup repository %s-%s-%s", bslName, sourceNamespace, repositoryType) + } + + err = fs.boostRepoConnect(ctx, repositoryType, credentialGetter) + if err != nil { + return errors.Wrapf(err, "error to boost backup repository connection %s-%s-%s", bslName, sourceNamespace, repositoryType) + } + + fs.uploaderProv, err = provider.NewUploaderProvider(ctx, fs.client, uploaderType, fs.requestorType, "", + fs.backupLocation, fs.backupRepo, credentialGetter, repokey.RepoKeySelector(), fs.log) + if err != nil { + return errors.Wrapf(err, "error creating uploader %s", uploaderType) + } + + fs.initialized = true + + fs.log.WithFields( + logrus.Fields{ + "jobName": fs.jobName, + "bsl": bslName, + "source namespace": sourceNamespace, + "uploader": uploaderType, + "repository": repositoryType, + }).Info("FileSystemBR is initialized") + + return nil +} + +func (fs *fileSystemBR) Close(ctx context.Context) { + if fs.uploaderProv != nil { + if err := fs.uploaderProv.Close(ctx); err != nil { + fs.log.Errorf("failed to close uploader provider with error %v", err) + } + + fs.uploaderProv = nil + } + + if fs.cancel != nil { + fs.cancel() + fs.cancel = nil + } + + fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed") +} + +func (fs *fileSystemBR) StartBackup(source AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error { + if !fs.initialized { + return errors.New("file system data path is not initialized") + } + + go func() { + snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, tags, forceFull, parentSnapshot, fs) + + if err == provider.ErrorCanceled { + fs.callbacks.OnCancelled(fs.ctx, fs.namespace, fs.jobName) + } else if err != nil { + fs.callbacks.OnFailed(fs.ctx, fs.namespace, fs.jobName, err) + } else { + fs.callbacks.OnCompleted(fs.ctx, fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source}}) + } + }() + + return nil +} + +func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) error { + if !fs.initialized { + return errors.New("file system data path is not initialized") + } + + go func() { + err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, fs) + + if err == provider.ErrorCanceled { + fs.callbacks.OnCancelled(fs.ctx, fs.namespace, fs.jobName) + } else if err != nil { + fs.callbacks.OnFailed(fs.ctx, fs.namespace, fs.jobName, err) + } else { + fs.callbacks.OnCompleted(fs.ctx, fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target}}) + } + }() + + return nil +} + +// UpdateProgress which implement ProgressUpdater interface to update progress status +func (fs *fileSystemBR) UpdateProgress(p *uploader.Progress) { + if fs.callbacks.OnProgress != nil { + fs.callbacks.OnProgress(fs.ctx, fs.namespace, fs.jobName, &uploader.Progress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone}) + } +} + +func (fs *fileSystemBR) Cancel() { + fs.cancel() + fs.log.WithField("user", fs.jobName).Info("FileSystemBR is canceled") +} + +func (fs *fileSystemBR) boostRepoConnect(ctx context.Context, repositoryType string, credentialGetter *credentials.CredentialGetter) error { + if repositoryType == velerov1api.BackupRepositoryTypeKopia { + if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil { + return err + } + } else { + if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil { + return err + } + } + + return nil +} diff --git a/pkg/datapath/file_system_test.go b/pkg/datapath/file_system_test.go new file mode 100644 index 0000000000..19c36f6def --- /dev/null +++ b/pkg/datapath/file_system_test.go @@ -0,0 +1,197 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datapath + +import ( + "context" + "testing" + + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + velerotest "github.com/vmware-tanzu/velero/pkg/test" + "github.com/vmware-tanzu/velero/pkg/uploader/provider" + providerMock "github.com/vmware-tanzu/velero/pkg/uploader/provider/mocks" +) + +func TestAsyncBackup(t *testing.T) { + var asyncErr error + var asyncResult Result + finish := make(chan struct{}) + + tests := []struct { + name string + uploaderProv provider.Provider + callbacks Callbacks + err error + result Result + path string + }{ + { + name: "async backup fail", + callbacks: Callbacks{ + OnCompleted: nil, + OnCancelled: nil, + OnFailed: func(ctx context.Context, namespace string, job string, err error) { + asyncErr = err + asyncResult = Result{} + finish <- struct{}{} + }, + }, + err: errors.New("fake-error"), + }, + { + name: "async backup cancel", + callbacks: Callbacks{ + OnCompleted: nil, + OnFailed: nil, + OnCancelled: func(ctx context.Context, namespace string, job string) { + asyncErr = provider.ErrorCanceled + asyncResult = Result{} + finish <- struct{}{} + }, + }, + err: provider.ErrorCanceled, + }, + { + name: "async backup complete", + callbacks: Callbacks{ + OnFailed: nil, + OnCancelled: nil, + OnCompleted: func(ctx context.Context, namespace string, job string, result Result) { + asyncResult = result + asyncErr = nil + finish <- struct{}{} + }, + }, + result: Result{ + Backup: BackupResult{ + SnapshotID: "fake-snapshot", + EmptySnapshot: false, + Source: AccessPoint{ByPath: "fake-path"}, + }, + }, + path: "fake-path", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR) + mockProvider := providerMock.NewProvider(t) + mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err) + fs.uploaderProv = mockProvider + fs.initialized = true + fs.callbacks = test.callbacks + + err := fs.StartBackup(AccessPoint{ByPath: test.path}, "", false, nil) + require.Equal(t, nil, err) + + <-finish + + assert.Equal(t, asyncErr, test.err) + assert.Equal(t, asyncResult, test.result) + }) + } + + close(finish) +} + +func TestAsyncRestore(t *testing.T) { + var asyncErr error + var asyncResult Result + finish := make(chan struct{}) + + tests := []struct { + name string + uploaderProv provider.Provider + callbacks Callbacks + err error + result Result + path string + snapshot string + }{ + { + name: "async restore fail", + callbacks: Callbacks{ + OnCompleted: nil, + OnCancelled: nil, + OnFailed: func(ctx context.Context, namespace string, job string, err error) { + asyncErr = err + asyncResult = Result{} + finish <- struct{}{} + }, + }, + err: errors.New("fake-error"), + }, + { + name: "async restore cancel", + callbacks: Callbacks{ + OnCompleted: nil, + OnFailed: nil, + OnCancelled: func(ctx context.Context, namespace string, job string) { + asyncErr = provider.ErrorCanceled + asyncResult = Result{} + finish <- struct{}{} + }, + }, + err: provider.ErrorCanceled, + }, + { + name: "async restore complete", + callbacks: Callbacks{ + OnFailed: nil, + OnCancelled: nil, + OnCompleted: func(ctx context.Context, namespace string, job string, result Result) { + asyncResult = result + asyncErr = nil + finish <- struct{}{} + }, + }, + result: Result{ + Restore: RestoreResult{ + Target: AccessPoint{ByPath: "fake-path"}, + }, + }, + path: "fake-path", + snapshot: "fake-snapshot", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR) + mockProvider := providerMock.NewProvider(t) + mockProvider.On("RunRestore", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.err) + fs.uploaderProv = mockProvider + fs.initialized = true + fs.callbacks = test.callbacks + + err := fs.StartRestore(test.snapshot, AccessPoint{ByPath: test.path}) + require.Equal(t, nil, err) + + <-finish + + assert.Equal(t, asyncErr, test.err) + assert.Equal(t, asyncResult, test.result) + }) + } + + close(finish) +} diff --git a/pkg/datapath/manager.go b/pkg/datapath/manager.go new file mode 100644 index 0000000000..2af3232539 --- /dev/null +++ b/pkg/datapath/manager.go @@ -0,0 +1,77 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datapath + +import ( + "context" + "sync" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var ConcurrentLimitExceed error = errors.New("Concurrent number exceeds") +var FSBRCreator = newFileSystemBR + +type Manager struct { + cocurrentNum int + trackerLock sync.Mutex + tracker map[string]AsyncBR +} + +// NewManager creates the data path manager to manage concurrent data path instances +func NewManager(cocurrentNum int) *Manager { + return &Manager{ + cocurrentNum: cocurrentNum, + tracker: map[string]AsyncBR{}, + } +} + +// CreateFileSystemBR creates a new file system backup/restore data path instance +func (m *Manager) CreateFileSystemBR(jobName string, requestorType string, ctx context.Context, client client.Client, namespace string, callbacks Callbacks, log logrus.FieldLogger) (AsyncBR, error) { + m.trackerLock.Lock() + defer m.trackerLock.Unlock() + + if len(m.tracker) == m.cocurrentNum { + return nil, ConcurrentLimitExceed + } + + m.tracker[jobName] = FSBRCreator(jobName, requestorType, client, namespace, callbacks, log) + + return m.tracker[jobName], nil +} + +// RemoveAsyncBR removes a file system backup/restore data path instance +func (m *Manager) RemoveAsyncBR(jobName string) { + m.trackerLock.Lock() + defer m.trackerLock.Unlock() + + delete(m.tracker, jobName) +} + +// GetAsyncBR returns the file system backup/restore data path instance for the specified job name +func (m *Manager) GetAsyncBR(jobName string) AsyncBR { + m.trackerLock.Lock() + defer m.trackerLock.Unlock() + + if async, exist := m.tracker[jobName]; exist { + return async + } else { + return nil + } +} diff --git a/pkg/datapath/manager_test.go b/pkg/datapath/manager_test.go new file mode 100644 index 0000000000..e4b4d40c63 --- /dev/null +++ b/pkg/datapath/manager_test.go @@ -0,0 +1,52 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datapath + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestManager(t *testing.T) { + m := NewManager(2) + + async_job_1, err := m.CreateFileSystemBR("job-1", "test", context.TODO(), nil, "velero", Callbacks{}, nil) + assert.NoError(t, err) + + _, err = m.CreateFileSystemBR("job-2", "test", context.TODO(), nil, "velero", Callbacks{}, nil) + assert.NoError(t, err) + + _, err = m.CreateFileSystemBR("job-3", "test", context.TODO(), nil, "velero", Callbacks{}, nil) + assert.Equal(t, ConcurrentLimitExceed, err) + + ret := m.GetAsyncBR("job-0") + assert.Equal(t, nil, ret) + + ret = m.GetAsyncBR("job-1") + assert.Equal(t, async_job_1, ret) + + m.RemoveAsyncBR("job-0") + assert.Equal(t, 2, len(m.tracker)) + + m.RemoveAsyncBR("job-1") + assert.Equal(t, 1, len(m.tracker)) + + ret = m.GetAsyncBR("job-1") + assert.Equal(t, nil, ret) +} diff --git a/pkg/datapath/types.go b/pkg/datapath/types.go new file mode 100644 index 0000000000..97926ca154 --- /dev/null +++ b/pkg/datapath/types.go @@ -0,0 +1,74 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package datapath + +import ( + "context" + + "github.com/vmware-tanzu/velero/internal/credentials" + "github.com/vmware-tanzu/velero/pkg/repository" + "github.com/vmware-tanzu/velero/pkg/uploader" +) + +// Result represents the result of a backup/restore +type Result struct { + Backup BackupResult + Restore RestoreResult +} + +// BackupResult represents the result of a backup +type BackupResult struct { + SnapshotID string + EmptySnapshot bool + Source AccessPoint +} + +// RestoreResult represents the result of a restore +type RestoreResult struct { + Target AccessPoint +} + +// Callbacks defines the collection of callbacks during backup/restore +type Callbacks struct { + OnCompleted func(context.Context, string, string, Result) + OnFailed func(context.Context, string, string, error) + OnCancelled func(context.Context, string, string) + OnProgress func(context.Context, string, string, *uploader.Progress) +} + +// AccessPoint represents an access point that has been exposed to a data path instance +type AccessPoint struct { + ByPath string +} + +// AsyncBR is the interface for asynchronous data path methods +type AsyncBR interface { + // Init initializes an asynchronous data path instance + Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error + + // StartBackup starts an asynchronous data path instance for backup + StartBackup(source AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error + + // StartRestore starts an asynchronous data path instance for restore + StartRestore(snapshotID string, target AccessPoint) error + + // Cancel cancels an asynchronous data path instance + Cancel() + + // Close closes an asynchronous data path instance + Close(ctx context.Context) +} diff --git a/pkg/exposer/host_path.go b/pkg/exposer/host_path.go new file mode 100644 index 0000000000..82f3e075e5 --- /dev/null +++ b/pkg/exposer/host_path.go @@ -0,0 +1,61 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package exposer + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + corev1 "k8s.io/api/core/v1" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + + "github.com/vmware-tanzu/velero/pkg/datapath" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" + "github.com/vmware-tanzu/velero/pkg/util/kube" +) + +var getVolumeDirectory = kube.GetVolumeDirectory +var singlePathMatch = kube.SinglePathMatch + +// GetPodVolumeHostPath returns a path that can be accessed from the host for a given volume of a pod +func GetPodVolumeHostPath(ctx context.Context, pod *corev1.Pod, pvcName string, + cli ctrlclient.Client, fs filesystem.Interface, log logrus.FieldLogger) (datapath.AccessPoint, error) { + logger := log.WithField("pod name", pod.Name).WithField("pod UID", pod.GetUID()).WithField("pvc", pvcName) + + volDir, err := getVolumeDirectory(ctx, logger, pod, pvcName, cli) + if err != nil { + return datapath.AccessPoint{}, errors.Wrapf(err, "error getting volume directory name for pvc %s in pod %s", pvcName, pod.Name) + } + + logger.WithField("volDir", volDir).Info("Got volume for backup PVC") + + pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", string(pod.GetUID()), volDir) + logger.WithField("pathGlob", pathGlob).Debug("Looking for path matching glob") + + path, err := singlePathMatch(pathGlob, fs, logger) + if err != nil { + return datapath.AccessPoint{}, errors.Wrapf(err, "error identifying unique volume path on host for pvc %s in pod %s", pvcName, pod.Name) + } + + logger.WithField("path", path).Info("Found path matching glob") + + return datapath.AccessPoint{ + ByPath: path, + }, nil +} diff --git a/pkg/exposer/host_path_test.go b/pkg/exposer/host_path_test.go new file mode 100644 index 0000000000..9300be6f08 --- /dev/null +++ b/pkg/exposer/host_path_test.go @@ -0,0 +1,81 @@ +/* +Copyright The Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package exposer + +import ( + "context" + "testing" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + + velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" + "github.com/vmware-tanzu/velero/pkg/builder" + velerotest "github.com/vmware-tanzu/velero/pkg/test" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" +) + +func TestGetPodVolumeHostPath(t *testing.T) { + tests := []struct { + name string + getVolumeDirFunc func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (string, error) + pathMatchFunc func(string, filesystem.Interface, logrus.FieldLogger) (string, error) + pod *corev1.Pod + pvc string + err string + }{ + { + name: "get volume dir fail", + getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (string, error) { + return "", errors.New("fake-error-1") + }, + pod: builder.ForPod(velerov1api.DefaultNamespace, "fake-pod-1").Result(), + pvc: "fake-pvc-1", + err: "error getting volume directory name for pvc fake-pvc-1 in pod fake-pod-1: fake-error-1", + }, + { + name: "single path match fail", + getVolumeDirFunc: func(context.Context, logrus.FieldLogger, *corev1.Pod, string, ctrlclient.Client) (string, error) { + return "", nil + }, + pathMatchFunc: func(string, filesystem.Interface, logrus.FieldLogger) (string, error) { + return "", errors.New("fake-error-2") + }, + pod: builder.ForPod(velerov1api.DefaultNamespace, "fake-pod-2").Result(), + pvc: "fake-pvc-1", + err: "error identifying unique volume path on host for pvc fake-pvc-1 in pod fake-pod-2: fake-error-2", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if test.getVolumeDirFunc != nil { + getVolumeDirectory = test.getVolumeDirFunc + } + + if test.pathMatchFunc != nil { + singlePathMatch = test.pathMatchFunc + } + + _, err := GetPodVolumeHostPath(context.Background(), test.pod, test.pvc, nil, nil, velerotest.NewLogger()) + assert.EqualError(t, err, test.err) + }) + } +} diff --git a/pkg/uploader/kopia/snapshot.go b/pkg/uploader/kopia/snapshot.go index 0bc09f35a2..17a6ac39c3 100644 --- a/pkg/uploader/kopia/snapshot.go +++ b/pkg/uploader/kopia/snapshot.go @@ -94,7 +94,7 @@ func setupDefaultPolicy(ctx context.Context, rep repo.RepositoryWriter, sourceIn // Backup backup specific sourcePath and update progress func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, - parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { if fsUploader == nil { return nil, false, errors.New("get empty kopia uploader") } @@ -122,7 +122,7 @@ func Backup(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter rep } kopiaCtx := logging.SetupKopiaLog(ctx, log) - snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, rootDir, parentSnapshot, log, "Kopia Uploader") + snapID, snapshotSize, err := SnapshotSource(kopiaCtx, repoWriter, fsUploader, sourceInfo, rootDir, forceFull, parentSnapshot, tags, log, "Kopia Uploader") if err != nil { return nil, false, err } @@ -170,7 +170,9 @@ func SnapshotSource( u SnapshotUploader, sourceInfo snapshot.SourceInfo, rootDir fs.Entry, + forceFull bool, parentSnapshot string, + snapshotTags map[string]string, log logrus.FieldLogger, description string, ) (string, int64, error) { @@ -178,21 +180,24 @@ func SnapshotSource( snapshotStartTime := time.Now() var previous []*snapshot.Manifest - if parentSnapshot != "" { - mani, err := loadSnapshotFunc(ctx, rep, manifest.ID(parentSnapshot)) - if err != nil { - return "", 0, errors.Wrapf(err, "Failed to load previous snapshot %v from kopia", parentSnapshot) - } + if !forceFull { + if parentSnapshot != "" { + mani, err := loadSnapshotFunc(ctx, rep, manifest.ID(parentSnapshot)) + if err != nil { + return "", 0, errors.Wrapf(err, "Failed to load previous snapshot %v from kopia", parentSnapshot) + } - previous = append(previous, mani) - } else { - pre, err := findPreviousSnapshotManifest(ctx, rep, sourceInfo, nil) - if err != nil { - return "", 0, errors.Wrapf(err, "Failed to find previous kopia snapshot manifests for si %v", sourceInfo) - } + previous = append(previous, mani) + } else { + pre, err := findPreviousSnapshotManifest(ctx, rep, sourceInfo, snapshotTags, nil) + if err != nil { + return "", 0, errors.Wrapf(err, "Failed to find previous kopia snapshot manifests for si %v", sourceInfo) + } - previous = pre + previous = pre + } } + var manifest *snapshot.Manifest if err := setupDefaultPolicy(ctx, rep, sourceInfo); err != nil { return "", 0, errors.Wrapf(err, "unable to set policy for si %v", sourceInfo) @@ -208,6 +213,8 @@ func SnapshotSource( return "", 0, errors.Wrapf(err, "Failed to upload the kopia snapshot for si %v", sourceInfo) } + manifest.Tags = snapshotTags + manifest.Description = description if _, err = saveSnapshotFunc(ctx, rep, manifest); err != nil { @@ -247,7 +254,7 @@ func reportSnapshotStatus(manifest *snapshot.Manifest, policyTree *policy.Tree) // findPreviousSnapshotManifest returns the list of previous snapshots for a given source, including // last complete snapshot following it. -func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sourceInfo snapshot.SourceInfo, noLaterThan *time.Time) ([]*snapshot.Manifest, error) { +func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sourceInfo snapshot.SourceInfo, snapshotTags map[string]string, noLaterThan *time.Time) ([]*snapshot.Manifest, error) { man, err := snapshot.ListSnapshots(ctx, rep, sourceInfo) if err != nil { return nil, err @@ -257,6 +264,15 @@ func findPreviousSnapshotManifest(ctx context.Context, rep repo.Repository, sour var result []*snapshot.Manifest for _, p := range man { + requestor, found := p.Tags[uploader.SnapshotRequestorTag] + if !found { + continue + } + + if requestor != snapshotTags[uploader.SnapshotRequestorTag] { + continue + } + if noLaterThan != nil && p.StartTime.After(*noLaterThan) { continue } diff --git a/pkg/uploader/kopia/snapshot_test.go b/pkg/uploader/kopia/snapshot_test.go index 1af69f139d..a42078cc87 100644 --- a/pkg/uploader/kopia/snapshot_test.go +++ b/pkg/uploader/kopia/snapshot_test.go @@ -186,7 +186,7 @@ func TestSnapshotSource(t *testing.T) { t.Run(tc.name, func(t *testing.T) { s := injectSnapshotFuncs() MockFuncs(s, tc.args) - _, _, err = SnapshotSource(ctx, s.repoWriterMock, s.uploderMock, sourceInfo, rootDir, "/", log, "TestSnapshotSource") + _, _, err = SnapshotSource(ctx, s.repoWriterMock, s.uploderMock, sourceInfo, rootDir, false, "/", nil, log, "TestSnapshotSource") if tc.notError { assert.NoError(t, err) } else { diff --git a/pkg/uploader/provider/kopia.go b/pkg/uploader/provider/kopia.go index 022740f0bc..ea00be46fa 100644 --- a/pkg/uploader/provider/kopia.go +++ b/pkg/uploader/provider/kopia.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strings" + "sync/atomic" "github.com/kopia/kopia/snapshot/snapshotfs" "github.com/pkg/errors" @@ -41,21 +42,25 @@ var RestoreFunc = kopia.Restore // kopiaProvider recorded info related with kopiaProvider type kopiaProvider struct { - bkRepo udmrepo.BackupRepo - credGetter *credentials.CredentialGetter - log logrus.FieldLogger + requestorType string + bkRepo udmrepo.BackupRepo + credGetter *credentials.CredentialGetter + log logrus.FieldLogger + canceling int32 } // NewKopiaUploaderProvider initialized with open or create a repository func NewKopiaUploaderProvider( + requestorType string, ctx context.Context, credGetter *credentials.CredentialGetter, backupRepo *velerov1api.BackupRepository, log logrus.FieldLogger, ) (Provider, error) { kp := &kopiaProvider{ - log: log, - credGetter: credGetter, + requestorType: requestorType, + log: log, + credGetter: credGetter, } //repoUID which is used to generate kopia repository config with unique directory path repoUID := string(backupRepo.GetUID()) @@ -85,6 +90,8 @@ func (kp *kopiaProvider) CheckContext(ctx context.Context, finishChan chan struc kp.log.Infof("Action finished") return case <-ctx.Done(): + atomic.StoreInt32(&kp.canceling, 1) + if uploader != nil { uploader.Cancel() kp.log.Infof("Backup is been canceled") @@ -107,6 +114,7 @@ func (kp *kopiaProvider) RunBackup( ctx context.Context, path string, tags map[string]string, + forceFull bool, parentSnapshot string, updater uploader.ProgressUpdater) (string, bool, error) { if updater == nil { @@ -132,9 +140,19 @@ func (kp *kopiaProvider) RunBackup( close(quit) }() - snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, parentSnapshot, log) + if tags == nil { + tags = make(map[string]string) + } + tags[uploader.SnapshotRequestorTag] = kp.requestorType + + snapshotInfo, isSnapshotEmpty, err := BackupFunc(ctx, kpUploader, repoWriter, path, forceFull, parentSnapshot, tags, log) if err != nil { - return "", false, errors.Wrapf(err, "Failed to run kopia backup") + if kpUploader.IsCanceled() { + log.Error("Kopia backup is canceled") + return "", false, ErrorCanceled + } else { + return "", false, errors.Wrapf(err, "Failed to run kopia backup") + } } else if isSnapshotEmpty { log.Debugf("Kopia backup got empty dir with path %s", path) return "", true, nil @@ -177,28 +195,33 @@ func (kp *kopiaProvider) RunRestore( "volumePath": volumePath, }) repoWriter := kopia.NewShimRepo(kp.bkRepo) - prorgess := new(kopia.Progress) - prorgess.InitThrottle(restoreProgressCheckInterval) - prorgess.Updater = updater + progress := new(kopia.Progress) + progress.InitThrottle(restoreProgressCheckInterval) + progress.Updater = updater restoreCancel := make(chan struct{}) quit := make(chan struct{}) log.Info("Starting restore") - go kp.CheckContext(ctx, quit, restoreCancel, nil) - defer func() { - if restoreCancel != nil { - close(restoreCancel) - } close(quit) }() - size, fileCount, err := RestoreFunc(ctx, repoWriter, prorgess, snapshotID, volumePath, log, restoreCancel) + go kp.CheckContext(ctx, quit, restoreCancel, nil) + + // We use the cancel channel to control the restore cancel, so don't pass a context with cancel to Kopia restore. + // Otherwise, Kopia restore will not response to the cancel control but return an arbitrary error. + // Kopia restore cancel is not designed as well as Kopia backup which uses the context to control backup cancel all the way. + size, fileCount, err := RestoreFunc(context.Background(), repoWriter, progress, snapshotID, volumePath, log, restoreCancel) if err != nil { return errors.Wrapf(err, "Failed to run kopia restore") } + if atomic.LoadInt32(&kp.canceling) == 1 { + log.Error("Kopia restore is canceled") + return ErrorCanceled + } + // which ensure that the statistic data of TotalBytes equal to BytesDone when finished updater.UpdateProgress(&uploader.Progress{ TotalBytes: size, diff --git a/pkg/uploader/provider/kopia_test.go b/pkg/uploader/provider/kopia_test.go index 1fd8c2b7bc..207a3ed036 100644 --- a/pkg/uploader/provider/kopia_test.go +++ b/pkg/uploader/provider/kopia_test.go @@ -40,26 +40,26 @@ func TestRunBackup(t *testing.T) { updater := FakeBackupProgressUpdater{PodVolumeBackup: &velerov1api.PodVolumeBackup{}, Log: kp.log, Ctx: context.Background(), Cli: fake.NewClientBuilder().WithScheme(scheme.Scheme).Build()} testCases := []struct { name string - hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) + hookBackupFunc func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) notError bool }{ { name: "success to backup", - hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { return &uploader.SnapshotInfo{}, false, nil }, notError: true, }, { name: "get error to backup", - hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { return &uploader.SnapshotInfo{}, false, errors.New("failed to backup") }, notError: false, }, { name: "got empty snapshot", - hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath, parentSnapshot string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { + hookBackupFunc: func(ctx context.Context, fsUploader *snapshotfs.Uploader, repoWriter repo.RepositoryWriter, sourcePath string, forceFull bool, parentSnapshot string, tags map[string]string, log logrus.FieldLogger) (*uploader.SnapshotInfo, bool, error) { return nil, true, errors.New("snapshot is empty") }, notError: false, @@ -68,7 +68,7 @@ func TestRunBackup(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { BackupFunc = tc.hookBackupFunc - _, _, err := kp.RunBackup(context.Background(), "var", nil, "", &updater) + _, _, err := kp.RunBackup(context.Background(), "var", nil, false, "", &updater) if tc.notError { assert.NoError(t, err) } else { diff --git a/pkg/uploader/provider/mocks/Provider.go b/pkg/uploader/provider/mocks/Provider.go new file mode 100644 index 0000000000..dbb52b7fb4 --- /dev/null +++ b/pkg/uploader/provider/mocks/Provider.go @@ -0,0 +1,90 @@ +// Code generated by mockery v2.22.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + + uploader "github.com/vmware-tanzu/velero/pkg/uploader" +) + +// Provider is an autogenerated mock type for the Provider type +type Provider struct { + mock.Mock +} + +// Close provides a mock function with given fields: ctx +func (_m *Provider) Close(ctx context.Context) error { + ret := _m.Called(ctx) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// RunBackup provides a mock function with given fields: ctx, path, tags, forceFull, parentSnapshot, updater +func (_m *Provider) RunBackup(ctx context.Context, path string, tags map[string]string, forceFull bool, parentSnapshot string, updater uploader.ProgressUpdater) (string, bool, error) { + ret := _m.Called(ctx, path, tags, forceFull, parentSnapshot, updater) + + var r0 string + var r1 bool + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) (string, bool, error)); ok { + return rf(ctx, path, tags, forceFull, parentSnapshot, updater) + } + if rf, ok := ret.Get(0).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) string); ok { + r0 = rf(ctx, path, tags, forceFull, parentSnapshot, updater) + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) bool); ok { + r1 = rf(ctx, path, tags, forceFull, parentSnapshot, updater) + } else { + r1 = ret.Get(1).(bool) + } + + if rf, ok := ret.Get(2).(func(context.Context, string, map[string]string, bool, string, uploader.ProgressUpdater) error); ok { + r2 = rf(ctx, path, tags, forceFull, parentSnapshot, updater) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// RunRestore provides a mock function with given fields: ctx, snapshotID, volumePath, updater +func (_m *Provider) RunRestore(ctx context.Context, snapshotID string, volumePath string, updater uploader.ProgressUpdater) error { + ret := _m.Called(ctx, snapshotID, volumePath, updater) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, uploader.ProgressUpdater) error); ok { + r0 = rf(ctx, snapshotID, volumePath, updater) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +type mockConstructorTestingTNewProvider interface { + mock.TestingT + Cleanup(func()) +} + +// NewProvider creates a new instance of Provider. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewProvider(t mockConstructorTestingTNewProvider) *Provider { + mock := &Provider{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/uploader/provider/provider.go b/pkg/uploader/provider/provider.go index df017ac1b6..f38a69cba1 100644 --- a/pkg/uploader/provider/provider.go +++ b/pkg/uploader/provider/provider.go @@ -29,14 +29,14 @@ import ( "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" - "github.com/vmware-tanzu/velero/pkg/repository/provider" "github.com/vmware-tanzu/velero/pkg/uploader" - "github.com/vmware-tanzu/velero/pkg/util/filesystem" ) const restoreProgressCheckInterval = 10 * time.Second const backupProgressCheckInterval = 10 * time.Second +var ErrorCanceled error = errors.New("uploader is canceled") + // Provider which is designed for one pod volume to do the backup or restore type Provider interface { // RunBackup which will do backup for one specific volume and return snapshotID, isSnapshotEmpty, error @@ -45,6 +45,7 @@ type Provider interface { ctx context.Context, path string, tags map[string]string, + forceFull bool, parentSnapshot string, updater uploader.ProgressUpdater) (string, bool, error) // RunRestore which will do restore for one specific volume with given snapshot id and return error @@ -63,6 +64,7 @@ func NewUploaderProvider( ctx context.Context, client client.Client, uploaderType string, + requestorType string, repoIdentifier string, bsl *velerov1api.BackupStorageLocation, backupRepo *velerov1api.BackupRepository, @@ -70,20 +72,16 @@ func NewUploaderProvider( repoKeySelector *v1.SecretKeySelector, log logrus.FieldLogger, ) (Provider, error) { + if requestorType == "" { + return nil, errors.New("requestor type is empty") + } + if credGetter.FromFile == nil { return nil, errors.New("uninitialized FileStore credentail is not supported") } if uploaderType == uploader.KopiaType { - // We use the hardcode repositoryType velerov1api.BackupRepositoryTypeKopia for now, because we have only one implementation of unified repo. - // TODO: post v1.10, replace the hardcode. In future, when we have multiple implementations of Unified Repo (besides Kopia), we will add the - // repositoryType to BSL, because by then, we are not able to hardcode the repositoryType to BackupRepositoryTypeKopia for Unified Repo. - if err := provider.NewUnifiedRepoProvider(*credGetter, velerov1api.BackupRepositoryTypeKopia, log).ConnectToRepo(ctx, provider.RepoParam{BackupLocation: bsl, BackupRepo: backupRepo}); err != nil { - return nil, errors.Wrap(err, "failed to connect repository") - } - return NewKopiaUploaderProvider(ctx, credGetter, backupRepo, log) - } - if err := provider.NewResticRepositoryProvider(credGetter.FromFile, filesystem.NewFileSystem(), log).ConnectToRepo(ctx, provider.RepoParam{BackupLocation: bsl, BackupRepo: backupRepo}); err != nil { - return nil, errors.Wrap(err, "failed to connect repository") + return NewKopiaUploaderProvider(requestorType, ctx, credGetter, backupRepo, log) + } else { + return NewResticUploaderProvider(repoIdentifier, bsl, credGetter, repoKeySelector, log) } - return NewResticUploaderProvider(repoIdentifier, bsl, credGetter, repoKeySelector, log) } diff --git a/pkg/uploader/provider/restic.go b/pkg/uploader/provider/restic.go index ce8ec49145..7b68d2e5b9 100644 --- a/pkg/uploader/provider/restic.go +++ b/pkg/uploader/provider/restic.go @@ -113,6 +113,7 @@ func (rp *resticProvider) RunBackup( ctx context.Context, path string, tags map[string]string, + forceFull bool, parentSnapshot string, updater uploader.ProgressUpdater) (string, bool, error) { if updater == nil { diff --git a/pkg/uploader/provider/restic_test.go b/pkg/uploader/provider/restic_test.go index 16d8382257..f2ba7a9708 100644 --- a/pkg/uploader/provider/restic_test.go +++ b/pkg/uploader/provider/restic_test.go @@ -64,7 +64,7 @@ func TestResticRunBackup(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ResticBackupCMDFunc = tc.hookBackupFunc - _, _, err := rp.RunBackup(context.Background(), "var", nil, "", &updater) + _, _, err := rp.RunBackup(context.Background(), "var", nil, false, "", &updater) rp.log.Infof("test name %v error %v", tc.name, err) require.Equal(t, true, tc.errorHandleFunc(err)) }) diff --git a/pkg/uploader/types.go b/pkg/uploader/types.go index 1ee2e7f640..ee565fa80a 100644 --- a/pkg/uploader/types.go +++ b/pkg/uploader/types.go @@ -22,8 +22,9 @@ import ( ) const ( - ResticType = "restic" - KopiaType = "kopia" + ResticType = "restic" + KopiaType = "kopia" + SnapshotRequestorTag = "snapshot-requestor" ) // ValidateUploaderType validates if the input param is a valid uploader type. diff --git a/pkg/util/kube/utils.go b/pkg/util/kube/utils.go index d23597397d..35d4f84d4e 100644 --- a/pkg/util/kube/utils.go +++ b/pkg/util/kube/utils.go @@ -196,8 +196,8 @@ func isProvisionedByCSI(log logrus.FieldLogger, pv *corev1api.PersistentVolume, return false, nil } -// SinglePathMatch function will be called by PVB and PVR controller to check whether pass-in volume path is valid. -// Check whether there is only one match by the path's pattern (/host_pods/%s/volumes/*/volume_name/[mount|]). +// SinglePathMatch checks whether pass-in volume path is valid. +// Check whether there is only one match by the path's pattern. func SinglePathMatch(path string, fs filesystem.Interface, log logrus.FieldLogger) (string, error) { matches, err := fs.Glob(path) if err != nil { From 9ab85892a7bddd821d1cb17c2cf74e506bcc5366 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Wed, 17 May 2023 12:13:23 +0800 Subject: [PATCH 2/2] add shared generic data path 02 Signed-off-by: Lyndon-Li --- .../pod_volume_backup_controller.go | 12 +++++------ .../pod_volume_backup_controller_test.go | 2 +- .../pod_volume_restore_controller.go | 12 +++++------ pkg/datapath/file_system.go | 21 ++++++++++--------- pkg/datapath/types.go | 2 +- 5 files changed, 25 insertions(+), 24 deletions(-) diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index 9b4a4cb8c0..71fb4850f7 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -159,7 +159,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ log.WithField("path", path.ByPath).Debugf("Found host path") if err := fsBackup.Init(ctx, pvb.Spec.BackupStorageLocation, pvb.Spec.Pod.Namespace, pvb.Spec.UploaderType, - podvolume.GetPvbRepositoryType(&pvb), r.repositoryEnsurer, r.credentialGetter); err != nil { + podvolume.GetPvbRepositoryType(&pvb), pvb.Spec.RepoIdentifier, r.repositoryEnsurer, r.credentialGetter); err != nil { return r.errorOut(ctx, &pvb, err, "error to initialize data path", log) } @@ -233,8 +233,8 @@ func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namesp log.WithError(err).Error("Async fs backup data path failed") var pvb velerov1api.PodVolumeBackup - if err := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); err != nil { - log.WithError(err).Warn("Failed to get PVB on failure") + if getErr := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); getErr != nil { + log.WithError(getErr).Warn("Failed to get PVB on failure") } else { _, _ = r.errorOut(ctx, &pvb, err, "data path backup failed", log) } @@ -248,10 +248,10 @@ func (r *PodVolumeBackupReconciler) OnDataPathCancelled(ctx context.Context, nam log.Warn("Async fs backup data path canceled") var pvb velerov1api.PodVolumeBackup - if err := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); err != nil { - log.WithError(err).Warn("Failed to get PVB on cancel") + if getErr := r.Client.Get(ctx, types.NamespacedName{Name: pvbName, Namespace: namespace}, &pvb); getErr != nil { + log.WithError(getErr).Warn("Failed to get PVB on cancel") } else { - _, _ = r.errorOut(ctx, &pvb, err, "data path backup canceled", log) + _, _ = r.errorOut(ctx, &pvb, errors.New("PVB is canceled"), "data path backup canceled", log) } } diff --git a/pkg/controller/pod_volume_backup_controller_test.go b/pkg/controller/pod_volume_backup_controller_test.go index ede8aa9d14..1f1e9e543d 100644 --- a/pkg/controller/pod_volume_backup_controller_test.go +++ b/pkg/controller/pod_volume_backup_controller_test.go @@ -99,7 +99,7 @@ type fakeFSBR struct { clock clock.WithTickerAndDelayedExecution } -func (b *fakeFSBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error { +func (b *fakeFSBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error { return nil } diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index 3e75422d3f..a2db1ba490 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -143,7 +143,7 @@ func (c *PodVolumeRestoreReconciler) Reconcile(ctx context.Context, req ctrl.Req log.WithField("path", volumePath.ByPath).Debugf("Found host path") if err := fsRestore.Init(ctx, pvr.Spec.BackupStorageLocation, pvr.Spec.Pod.Namespace, pvr.Spec.UploaderType, - podvolume.GetPvrRepositoryType(pvr), c.repositoryEnsurer, c.credentialGetter); err != nil { + podvolume.GetPvrRepositoryType(pvr), pvr.Spec.RepoIdentifier, c.repositoryEnsurer, c.credentialGetter); err != nil { return c.errorOut(ctx, pvr, err, "error to initialize data path", log) } @@ -326,8 +326,8 @@ func (c *PodVolumeRestoreReconciler) OnDataPathFailed(ctx context.Context, names log.WithError(err).Info("Async fs restore data path failed") var pvr velerov1api.PodVolumeRestore - if err := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil { - log.WithError(err).Warn("Failed to get PVR on failure") + if getErr := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil { + log.WithError(getErr).Warn("Failed to get PVR on failure") } else { _, _ = c.errorOut(ctx, &pvr, err, "data path restore failed", log) } @@ -341,10 +341,10 @@ func (c *PodVolumeRestoreReconciler) OnDataPathCancelled(ctx context.Context, na log.Info("Async fs restore data path canceled") var pvr velerov1api.PodVolumeRestore - if err := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); err != nil { - log.WithError(err).Warn("Failed to get PVR on cancel") + if getErr := c.Client.Get(ctx, types.NamespacedName{Name: pvrName, Namespace: namespace}, &pvr); getErr != nil { + log.WithError(getErr).Warn("Failed to get PVR on cancel") } else { - _, _ = c.errorOut(ctx, &pvr, err, "data path restore canceled", log) + _, _ = c.errorOut(ctx, &pvr, errors.New("PVR is canceled"), "data path restore canceled", log) } } diff --git a/pkg/datapath/file_system.go b/pkg/datapath/file_system.go index 6d7d915bb1..ce0e8dc0a2 100644 --- a/pkg/datapath/file_system.go +++ b/pkg/datapath/file_system.go @@ -30,6 +30,7 @@ import ( repoProvider "github.com/vmware-tanzu/velero/pkg/repository/provider" "github.com/vmware-tanzu/velero/pkg/uploader" "github.com/vmware-tanzu/velero/pkg/uploader/provider" + "github.com/vmware-tanzu/velero/pkg/util/filesystem" ) type fileSystemBR struct { @@ -61,7 +62,7 @@ func newFileSystemBR(jobName string, requestorType string, client client.Client, } func (fs *fileSystemBR) Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, - repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error { + repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error { var err error defer func() { if err != nil { @@ -91,7 +92,7 @@ func (fs *fileSystemBR) Init(ctx context.Context, bslName string, sourceNamespac return errors.Wrapf(err, "error to boost backup repository connection %s-%s-%s", bslName, sourceNamespace, repositoryType) } - fs.uploaderProv, err = provider.NewUploaderProvider(ctx, fs.client, uploaderType, fs.requestorType, "", + fs.uploaderProv, err = provider.NewUploaderProvider(ctx, fs.client, uploaderType, fs.requestorType, repoIdentifier, fs.backupLocation, fs.backupRepo, credentialGetter, repokey.RepoKeySelector(), fs.log) if err != nil { return errors.Wrapf(err, "error creating uploader %s", uploaderType) @@ -137,11 +138,11 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, parentSnapshot string, f snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, tags, forceFull, parentSnapshot, fs) if err == provider.ErrorCanceled { - fs.callbacks.OnCancelled(fs.ctx, fs.namespace, fs.jobName) + fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName) } else if err != nil { - fs.callbacks.OnFailed(fs.ctx, fs.namespace, fs.jobName, err) + fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, err) } else { - fs.callbacks.OnCompleted(fs.ctx, fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source}}) + fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Backup: BackupResult{snapshotID, emptySnapshot, source}}) } }() @@ -157,11 +158,11 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, fs) if err == provider.ErrorCanceled { - fs.callbacks.OnCancelled(fs.ctx, fs.namespace, fs.jobName) + fs.callbacks.OnCancelled(context.Background(), fs.namespace, fs.jobName) } else if err != nil { - fs.callbacks.OnFailed(fs.ctx, fs.namespace, fs.jobName, err) + fs.callbacks.OnFailed(context.Background(), fs.namespace, fs.jobName, err) } else { - fs.callbacks.OnCompleted(fs.ctx, fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target}}) + fs.callbacks.OnCompleted(context.Background(), fs.namespace, fs.jobName, Result{Restore: RestoreResult{Target: target}}) } }() @@ -171,7 +172,7 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint) erro // UpdateProgress which implement ProgressUpdater interface to update progress status func (fs *fileSystemBR) UpdateProgress(p *uploader.Progress) { if fs.callbacks.OnProgress != nil { - fs.callbacks.OnProgress(fs.ctx, fs.namespace, fs.jobName, &uploader.Progress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone}) + fs.callbacks.OnProgress(context.Background(), fs.namespace, fs.jobName, &uploader.Progress{TotalBytes: p.TotalBytes, BytesDone: p.BytesDone}) } } @@ -186,7 +187,7 @@ func (fs *fileSystemBR) boostRepoConnect(ctx context.Context, repositoryType str return err } } else { - if err := repoProvider.NewUnifiedRepoProvider(*credentialGetter, repositoryType, fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil { + if err := repoProvider.NewResticRepositoryProvider(credentialGetter.FromFile, filesystem.NewFileSystem(), fs.log).BoostRepoConnect(ctx, repoProvider.RepoParam{BackupLocation: fs.backupLocation, BackupRepo: fs.backupRepo}); err != nil { return err } } diff --git a/pkg/datapath/types.go b/pkg/datapath/types.go index 97926ca154..0b1e47c121 100644 --- a/pkg/datapath/types.go +++ b/pkg/datapath/types.go @@ -58,7 +58,7 @@ type AccessPoint struct { // AsyncBR is the interface for asynchronous data path methods type AsyncBR interface { // Init initializes an asynchronous data path instance - Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error + Init(ctx context.Context, bslName string, sourceNamespace string, uploaderType string, repositoryType string, repoIdentifier string, repositoryEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter) error // StartBackup starts an asynchronous data path instance for backup StartBackup(source AccessPoint, parentSnapshot string, forceFull bool, tags map[string]string) error