From d48e9762eb952159ab15173dad871e6322e5bee0 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Wed, 31 Jul 2024 11:52:26 +0800 Subject: [PATCH 1/3] data mover ms new controller Signed-off-by: Lyndon-Li --- pkg/builder/data_download_builder.go | 6 ++ pkg/cmd/cli/datamover/backup.go | 3 +- pkg/cmd/cli/nodeagent/server.go | 6 +- pkg/controller/data_download_controller.go | 59 +++++++--------- .../data_download_controller_test.go | 26 +++---- pkg/controller/data_upload_controller.go | 68 +++++++------------ pkg/controller/data_upload_controller_test.go | 8 ++- pkg/datamover/backup_micro_service.go | 2 +- pkg/util/logging/default_logger.go | 7 ++ pkg/util/logging/default_logger_test.go | 13 ++++ 10 files changed, 97 insertions(+), 101 deletions(-) diff --git a/pkg/builder/data_download_builder.go b/pkg/builder/data_download_builder.go index 9a85c79056..51dd90e064 100644 --- a/pkg/builder/data_download_builder.go +++ b/pkg/builder/data_download_builder.go @@ -111,6 +111,12 @@ func (d *DataDownloadBuilder) ObjectMeta(opts ...ObjectMetaOpt) *DataDownloadBui return d } +// Labels sets the DataDownload's Labels. +func (d *DataDownloadBuilder) Labels(labels map[string]string) *DataDownloadBuilder { + d.object.Labels = labels + return d +} + // StartTimestamp sets the DataDownload's StartTimestamp. func (d *DataDownloadBuilder) StartTimestamp(startTime *metav1.Time) *DataDownloadBuilder { d.object.Status.StartTimestamp = startTime diff --git a/pkg/cmd/cli/datamover/backup.go b/pkg/cmd/cli/datamover/backup.go index 2027b08931..35f483d921 100644 --- a/pkg/cmd/cli/datamover/backup.go +++ b/pkg/cmd/cli/datamover/backup.go @@ -207,8 +207,7 @@ func (s *dataMoverBackup) run() { } }() - // TODOOO: call s.runDataPath() - time.Sleep(time.Duration(1<<63 - 1)) + s.runDataPath() } func (s *dataMoverBackup) runDataPath() { diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 61dd0b0063..e19e1ab7f5 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -104,7 +104,7 @@ func NewServerCommand(f client.Factory) *cobra.Command { logLevel := logLevelFlag.Parse() logrus.Infof("Setting log-level to %s", strings.ToUpper(logLevel.String())) - logger := logging.DefaultLogger(logLevel, formatFlag.Parse()) + logger := logging.DefaultMergeLogger(logLevel, formatFlag.Parse()) logger.Infof("Starting Velero node-agent server %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA()) f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name())) @@ -292,13 +292,13 @@ func (s *nodeAgentServer) run() { if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 { loadAffinity = s.dataPathConfigs.LoadAffinity[0] } - dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) s.attemptDataUploadResume(dataUploadReconciler) if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data upload controller") } - dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) s.attemptDataDownloadResume(dataDownloadReconciler) if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data download controller") diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index c8e8cca500..bf3c8e7dfb 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -56,6 +57,7 @@ import ( type DataDownloadReconciler struct { client client.Client kubeClient kubernetes.Interface + mgr manager.Manager logger logrus.FieldLogger credentialGetter *credentials.CredentialGetter fileSystem filesystem.Interface @@ -68,11 +70,12 @@ type DataDownloadReconciler struct { metrics *metrics.ServerMetrics } -func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, +func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler { return &DataDownloadReconciler{ client: client, kubeClient: kubeClient, + mgr: mgr, logger: logger.WithField("controller", "DataDownload"), credentialGetter: credentialGetter, fileSystem: filesystem.NewFileSystem(), @@ -234,9 +237,9 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{}, nil } - fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name) + asyncBR := r.dataPathMgr.GetAsyncBR(dd.Name) - if fsRestore != nil { + if asyncBR != nil { log.Info("Cancellable data path is already started") return ctrl.Result{}, nil } @@ -259,7 +262,8 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request OnProgress: r.OnDataDownloadProgress, } - fsRestore, err = r.dataPathMgr.CreateFileSystemBR(dd.Name, dataUploadDownloadRequestor, ctx, r.client, dd.Namespace, callbacks, log) + asyncBR, err = r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeRestore, + dd.Name, dd.Namespace, result.ByPod.HostingPod.Name, result.ByPod.HostingContainer, dd.Name, callbacks, false, log) if err != nil { if err == datapath.ConcurrentLimitExceed { log.Info("Data path instance is concurrent limited requeue later") @@ -279,7 +283,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request log.Info("Data download is marked as in progress") - reconcileResult, err := r.runCancelableDataPath(ctx, fsRestore, dd, result, log) + reconcileResult, err := r.runCancelableDataPath(ctx, asyncBR, dd, result, log) if err != nil { log.Errorf("Failed to run cancelable data path for %s with err %v", dd.Name, err) r.closeDataPath(ctx, dd.Name) @@ -289,8 +293,8 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request log.Info("Data download is in progress") if dd.Spec.Cancel { log.Info("Data download is being canceled") - fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name) - if fsRestore == nil { + asyncBR := r.dataPathMgr.GetAsyncBR(dd.Name) + if asyncBR == nil { if r.nodeName == dd.Status.Node { r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName()) } else { @@ -306,7 +310,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request log.WithError(err).Error("error updating data download status") return ctrl.Result{}, err } - fsRestore.Cancel() + asyncBR.Cancel() return ctrl.Result{}, nil } @@ -327,33 +331,20 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } } -func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, fsRestore datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { - path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.VolumeName, r.client, r.fileSystem, log) - if err != nil { - return r.errorOut(ctx, dd, err, "error exposing host path for pod volume", log) - } - - log.WithField("path", path.ByPath).Debug("Found host path") - - if err := fsRestore.Init(ctx, &datapath.FSBRInitParam{ - BSLName: dd.Spec.BackupStorageLocation, - SourceNamespace: dd.Spec.SourceNamespace, - UploaderType: datamover.GetUploaderType(dd.Spec.DataMover), - RepositoryType: velerov1api.BackupRepositoryTypeKopia, - RepoIdentifier: "", - RepositoryEnsurer: r.repositoryEnsurer, - CredentialGetter: r.credentialGetter, - }); err != nil { - return r.errorOut(ctx, dd, err, "error to initialize data path", log) +func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { + if err := asyncBR.Init(ctx, nil); err != nil { + return r.errorOut(ctx, dd, err, "error to initialize asyncBR", log) } - log.WithField("path", path.ByPath).Info("fs init") + log.Infof("async restore init for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) - if err := fsRestore.StartRestore(dd.Spec.SnapshotID, path, dd.Spec.DataMoverConfig); err != nil { - return r.errorOut(ctx, dd, err, fmt.Sprintf("error starting data path %s restore", path.ByPath), log) + if err := asyncBR.StartRestore(dd.Spec.SnapshotID, datapath.AccessPoint{ + ByPath: res.ByPod.VolumeName, + }, dd.Spec.DataMoverConfig); err != nil { + return r.errorOut(ctx, dd, err, fmt.Sprintf("error starting async restore for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log) } - log.WithField("path", path.ByPath).Info("Async fs restore data path started") + log.Info("Async restore started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) return ctrl.Result{}, nil } @@ -561,7 +552,7 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(ctx context.Context, log.WithError(err).Warn("failed to cancel datadownload, and it will wait for prepare timeout") return []reconcile.Request{} } - log.Info("Exposed pod is in abnormal status, and datadownload is marked as cancel") + log.Infof("Exposed pod is in abnormal status(reason %s) and datadownload is marked as cancel", reason) } else { return []reconcile.Request{} } @@ -754,9 +745,9 @@ func (r *DataDownloadReconciler) getTargetPVC(ctx context.Context, dd *velerov2a } func (r *DataDownloadReconciler) closeDataPath(ctx context.Context, ddName string) { - fsBackup := r.dataPathMgr.GetAsyncBR(ddName) - if fsBackup != nil { - fsBackup.Close(ctx) + asyncBR := r.dataPathMgr.GetAsyncBR(ddName) + if asyncBR != nil { + asyncBR.Close(ctx) } r.dataPathMgr.RemoveAsyncBR(ddName) diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index 7c7c5dbefe..c6c2697f7a 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -33,10 +33,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" clientgofake "k8s.io/client-go/kubernetes/fake" ctrl "sigs.k8s.io/controller-runtime" kbclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -149,7 +151,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ... dataPathMgr := datapath.NewManager(1) - return NewDataDownloadReconciler(fakeClient, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil + return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } func TestDataDownloadReconcile(t *testing.T) { @@ -261,14 +263,6 @@ func TestDataDownloadReconcile(t *testing.T) { notMockCleanUp: true, expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, }, - { - name: "Error getting volume directory name for pvc in pod", - dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), - targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - notNilExpose: true, - mockClose: true, - expectedStatusMsg: "error identifying unique volume path on host", - }, { name: "Unable to update status to in progress for data download", dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), @@ -402,17 +396,18 @@ func TestDataDownloadReconcile(t *testing.T) { r.dataPathMgr = datapath.NewManager(1) } - datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { - fsBR := datapathmockes.NewAsyncBR(t) + datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, + string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { + asyncBR := datapathmockes.NewAsyncBR(t) if test.mockCancel { - fsBR.On("Cancel").Return() + asyncBR.On("Cancel").Return() } if test.mockClose { - fsBR.On("Close", mock.Anything).Return() + asyncBR.On("Close", mock.Anything).Return() } - return fsBR + return asyncBR } if test.isExposeErr || test.isGetExposeErr || test.isPeekExposeErr || test.isNilExposer || test.notNilExpose { @@ -443,7 +438,8 @@ func TestDataDownloadReconcile(t *testing.T) { if test.needCreateFSBR { if fsBR := r.dataPathMgr.GetAsyncBR(test.dd.Name); fsBR == nil { - _, err := r.dataPathMgr.CreateFileSystemBR(test.dd.Name, pVBRRequestor, ctx, r.client, velerov1api.DefaultNamespace, datapath.Callbacks{OnCancelled: r.OnDataDownloadCancelled}, velerotest.NewLogger()) + _, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, nil, nil, datapath.TaskTypeRestore, test.dd.Name, pVBRRequestor, + velerov1api.DefaultNamespace, "", "", datapath.Callbacks{OnCancelled: r.OnDataDownloadCancelled}, false, velerotest.NewLogger()) require.NoError(t, err) } } diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 4781f04f40..068945870e 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -35,6 +35,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -67,6 +68,7 @@ type DataUploadReconciler struct { client client.Client kubeClient kubernetes.Interface csiSnapshotClient snapshotter.SnapshotV1Interface + mgr manager.Manager repoEnsurer *repository.Ensurer Clock clocks.WithTickerAndDelayedExecution credentialGetter *credentials.CredentialGetter @@ -80,11 +82,12 @@ type DataUploadReconciler struct { metrics *metrics.ServerMetrics } -func NewDataUploadReconciler(client client.Client, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, +func NewDataUploadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution, cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler { return &DataUploadReconciler{ client: client, + mgr: mgr, kubeClient: kubeClient, csiSnapshotClient: csiSnapshotClient, Clock: clock, @@ -246,8 +249,8 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) return ctrl.Result{}, nil } - fsBackup := r.dataPathMgr.GetAsyncBR(du.Name) - if fsBackup != nil { + asyncBR := r.dataPathMgr.GetAsyncBR(du.Name) + if asyncBR != nil { log.Info("Cancellable data path is already started") return ctrl.Result{}, nil } @@ -270,7 +273,8 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) OnProgress: r.OnDataUploadProgress, } - fsBackup, err = r.dataPathMgr.CreateFileSystemBR(du.Name, dataUploadDownloadRequestor, ctx, r.client, du.Namespace, callbacks, log) + asyncBR, err = r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeBackup, + du.Name, du.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, du.Name, callbacks, false, log) if err != nil { if err == datapath.ConcurrentLimitExceed { log.Info("Data path instance is concurrent limited requeue later") @@ -288,7 +292,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) } log.Info("Data upload is marked as in progress") - result, err := r.runCancelableDataUpload(ctx, fsBackup, du, res, log) + result, err := r.runCancelableDataUpload(ctx, asyncBR, du, res, log) if err != nil { log.Errorf("Failed to run cancelable data path for %s with err %v", du.Name, err) r.closeDataPath(ctx, du.Name) @@ -299,8 +303,8 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) if du.Spec.Cancel { log.Info("Data upload is being canceled") - fsBackup := r.dataPathMgr.GetAsyncBR(du.Name) - if fsBackup == nil { + asyncBR := r.dataPathMgr.GetAsyncBR(du.Name) + if asyncBR == nil { if du.Status.Node == r.nodeName { r.OnDataUploadCancelled(ctx, du.GetNamespace(), du.GetName()) } else { @@ -316,7 +320,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) log.WithError(err).Error("error updating data upload into canceling status") return ctrl.Result{}, err } - fsBackup.Cancel() + asyncBR.Cancel() return ctrl.Result{}, nil } return ctrl.Result{}, nil @@ -336,44 +340,22 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } -func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, fsBackup datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { +func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, asyncBR datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { log.Info("Run cancelable dataUpload") - path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.VolumeName, r.client, r.fileSystem, log) - if err != nil { - return r.errorOut(ctx, du, err, "error exposing host path for pod volume", log) - } - - log.WithField("path", path.ByPath).Debug("Found host path") - - if err := fsBackup.Init(ctx, &datapath.FSBRInitParam{ - BSLName: du.Spec.BackupStorageLocation, - SourceNamespace: du.Spec.SourceNamespace, - UploaderType: datamover.GetUploaderType(du.Spec.DataMover), - RepositoryType: velerov1api.BackupRepositoryTypeKopia, - RepoIdentifier: "", - RepositoryEnsurer: r.repoEnsurer, - CredentialGetter: r.credentialGetter, - }); err != nil { - return r.errorOut(ctx, du, err, "error to initialize data path", log) + if err := asyncBR.Init(ctx, nil); err != nil { + return r.errorOut(ctx, du, err, "error to initialize asyncBR", log) } - log.WithField("path", path.ByPath).Info("fs init") - - tags := map[string]string{ - velerov1api.AsyncOperationIDLabel: du.Labels[velerov1api.AsyncOperationIDLabel], - } + log.Infof("async backup init for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) - if err := fsBackup.StartBackup(path, du.Spec.DataMoverConfig, &datapath.FSBRStartParam{ - RealSource: datamover.GetRealSource(du.Spec.SourceNamespace, du.Spec.SourcePVC), - ParentSnapshot: "", - ForceFull: false, - Tags: tags, - }); err != nil { - return r.errorOut(ctx, du, err, "error starting data path backup", log) + if err := asyncBR.StartBackup(datapath.AccessPoint{ + ByPath: res.ByPod.VolumeName, + }, du.Spec.DataMoverConfig, nil); err != nil { + return r.errorOut(ctx, du, err, fmt.Sprintf("error starting async backup for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log) } - log.WithField("path", path.ByPath).Info("Async fs backup data path started") + log.Info("Async backup started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) return ctrl.Result{}, nil } @@ -608,7 +590,7 @@ func (r *DataUploadReconciler) findDataUploadForPod(ctx context.Context, podObj log.WithError(err).Warn("failed to cancel dataupload, and it will wait for prepare timeout") return []reconcile.Request{} } - log.Info("Exposed pod is in abnormal status and dataupload is marked as cancel") + log.Infof("Exposed pod is in abnormal status(reason %s) and dataupload is marked as cancel", reason) } else { return []reconcile.Request{} } @@ -820,9 +802,9 @@ func (r *DataUploadReconciler) exclusiveUpdateDataUpload(ctx context.Context, du } func (r *DataUploadReconciler) closeDataPath(ctx context.Context, duName string) { - fsBackup := r.dataPathMgr.GetAsyncBR(duName) - if fsBackup != nil { - fsBackup.Close(ctx) + asyncBR := r.dataPathMgr.GetAsyncBR(duName) + if asyncBR != nil { + asyncBR.Close(ctx) } r.dataPathMgr.RemoveAsyncBR(duName) diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 2bca1d5b98..ee73372b1b 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -35,6 +35,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" clientgofake "k8s.io/client-go/kubernetes/fake" "k8s.io/utils/clock" testclocks "k8s.io/utils/clock/testing" @@ -42,6 +43,7 @@ import ( kbclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/vmware-tanzu/velero/internal/credentials" @@ -241,7 +243,7 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci if err != nil { return nil, err } - return NewDataUploadReconciler(fakeClient, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, nil, + return NewDataUploadReconciler(fakeClient, nil, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, nil, testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } @@ -548,7 +550,7 @@ func TestReconcile(t *testing.T) { r.snapshotExposerList = map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(r.kubeClient, r.csiSnapshotClient, velerotest.NewLogger())} } if !test.notCreateFSBR { - datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { + datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { return &fakeDataUploadFSBR{ du: test.du, kubeClient: r.client, @@ -559,7 +561,7 @@ func TestReconcile(t *testing.T) { if test.du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress && !test.notCreateFSBR { if fsBR := r.dataPathMgr.GetAsyncBR(test.du.Name); fsBR == nil { - _, err := r.dataPathMgr.CreateFileSystemBR(test.du.Name, pVBRRequestor, ctx, r.client, velerov1api.DefaultNamespace, datapath.Callbacks{OnCancelled: r.OnDataUploadCancelled}, velerotest.NewLogger()) + _, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, nil, nil, datapath.TaskTypeBackup, test.du.Name, velerov1api.DefaultNamespace, "", "", "", datapath.Callbacks{OnCancelled: r.OnDataUploadCancelled}, false, velerotest.NewLogger()) require.NoError(t, err) } } diff --git a/pkg/datamover/backup_micro_service.go b/pkg/datamover/backup_micro_service.go index 2f4fa6e56d..9db00b529f 100644 --- a/pkg/datamover/backup_micro_service.go +++ b/pkg/datamover/backup_micro_service.go @@ -103,7 +103,7 @@ func (r *BackupMicroService) Init() error { oldDu := oldObj.(*velerov2alpha1api.DataUpload) newDu := newObj.(*velerov2alpha1api.DataUpload) - if newDu.Name != r.dataUpload.Name { + if newDu.Name != r.dataUploadName { return } diff --git a/pkg/util/logging/default_logger.go b/pkg/util/logging/default_logger.go index b374c3d847..f745e09ac8 100644 --- a/pkg/util/logging/default_logger.go +++ b/pkg/util/logging/default_logger.go @@ -43,6 +43,13 @@ func DefaultLogger(level logrus.Level, format Format) *logrus.Logger { return createLogger(level, format, false) } +// DefaultLogger returns a Logger with the default properties +// and hooks, and also a hook to support log merge. +// The desired output format is passed as a LogFormat Enum. +func DefaultMergeLogger(level logrus.Level, format Format) *logrus.Logger { + return createLogger(level, format, true) +} + func createLogger(level logrus.Level, format Format, merge bool) *logrus.Logger { logger := logrus.New() diff --git a/pkg/util/logging/default_logger_test.go b/pkg/util/logging/default_logger_test.go index 10f6907578..148a08e9d0 100644 --- a/pkg/util/logging/default_logger_test.go +++ b/pkg/util/logging/default_logger_test.go @@ -38,3 +38,16 @@ func TestDefaultLogger(t *testing.T) { } } } + +func TestDefaultMergeLogger(t *testing.T) { + formatFlag := NewFormatFlag() + + for _, testFormat := range formatFlag.AllowedValues() { + formatFlag.Set(testFormat) + logger := DefaultMergeLogger(logrus.InfoLevel, formatFlag.Parse()) + assert.Equal(t, logrus.InfoLevel, logger.Level) + assert.Equal(t, os.Stdout, logger.Out) + + assert.Equal(t, DefaultHooks(true), logger.Hooks[ListeningLevel]) + } +} From 514ba56ca1765c147437e430abd9691e70263e2c Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Thu, 1 Aug 2024 13:07:22 +0800 Subject: [PATCH 2/3] data mover ms new controller Signed-off-by: Lyndon-Li --- pkg/cmd/cli/datamover/restore.go | 3 +-- pkg/controller/data_download_controller.go | 4 ++-- pkg/controller/data_upload_controller.go | 4 ++-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/cmd/cli/datamover/restore.go b/pkg/cmd/cli/datamover/restore.go index fbd92fa189..fb46abd409 100644 --- a/pkg/cmd/cli/datamover/restore.go +++ b/pkg/cmd/cli/datamover/restore.go @@ -198,8 +198,7 @@ func (s *dataMoverRestore) run() { } }() - // TODOOO: call s.runDataPath() - time.Sleep(time.Duration(1<<63 - 1)) + s.runDataPath() } func (s *dataMoverRestore) runDataPath() { diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index bf3c8e7dfb..abb1da2ae9 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -336,7 +336,7 @@ func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, asyn return r.errorOut(ctx, dd, err, "error to initialize asyncBR", log) } - log.Infof("async restore init for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) + log.Infof("async restore init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) if err := asyncBR.StartRestore(dd.Spec.SnapshotID, datapath.AccessPoint{ ByPath: res.ByPod.VolumeName, @@ -344,7 +344,7 @@ func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, asyn return r.errorOut(ctx, dd, err, fmt.Sprintf("error starting async restore for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log) } - log.Info("Async restore started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) + log.Infof("Async restore started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) return ctrl.Result{}, nil } diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 068945870e..7cc7ee41c2 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -347,7 +347,7 @@ func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, asyn return r.errorOut(ctx, du, err, "error to initialize asyncBR", log) } - log.Infof("async backup init for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) + log.Infof("async backup init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) if err := asyncBR.StartBackup(datapath.AccessPoint{ ByPath: res.ByPod.VolumeName, @@ -355,7 +355,7 @@ func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, asyn return r.errorOut(ctx, du, err, fmt.Sprintf("error starting async backup for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log) } - log.Info("Async backup started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) + log.Infof("Async backup started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) return ctrl.Result{}, nil } From 903458b61b7831f2b0609b6a8e6e9e92e2ae29a6 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Thu, 1 Aug 2024 14:58:19 +0800 Subject: [PATCH 3/3] data mover ms new controller Signed-off-by: Lyndon-Li --- changelogs/unreleased/8074-Lyndon-Li | 1 + pkg/controller/data_download_controller.go | 12 +++++++++--- pkg/controller/data_upload_controller.go | 12 +++++++++--- 3 files changed, 19 insertions(+), 6 deletions(-) create mode 100644 changelogs/unreleased/8074-Lyndon-Li diff --git a/changelogs/unreleased/8074-Lyndon-Li b/changelogs/unreleased/8074-Lyndon-Li new file mode 100644 index 0000000000..ea7acad683 --- /dev/null +++ b/changelogs/unreleased/8074-Lyndon-Li @@ -0,0 +1 @@ +Data mover micro service DUCR/DDCR controller refactor according to design #7576 \ No newline at end of file diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index abb1da2ae9..f0c0c1728d 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -349,7 +349,9 @@ func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, asyn } func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) { - defer r.closeDataPath(ctx, ddName) + defer func() { + go r.closeDataPath(ctx, ddName) + }() log := r.logger.WithField("datadownload", ddName) log.Info("Async fs restore data path completed") @@ -382,7 +384,9 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na } func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) { - defer r.closeDataPath(ctx, ddName) + defer func() { + go r.closeDataPath(ctx, ddName) + }() log := r.logger.WithField("datadownload", ddName) @@ -399,7 +403,9 @@ func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, names } func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) { - defer r.closeDataPath(ctx, ddName) + defer func() { + go r.closeDataPath(ctx, ddName) + }() log := r.logger.WithField("datadownload", ddName) diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 7cc7ee41c2..91413a8ccf 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -360,7 +360,9 @@ func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, asyn } func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namespace string, duName string, result datapath.Result) { - defer r.closeDataPath(ctx, duName) + defer func() { + go r.closeDataPath(ctx, duName) + }() log := r.logger.WithField("dataupload", duName) @@ -404,7 +406,9 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp } func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace, duName string, err error) { - defer r.closeDataPath(ctx, duName) + defer func() { + go r.closeDataPath(ctx, duName) + }() log := r.logger.WithField("dataupload", duName) @@ -421,7 +425,9 @@ func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace } func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namespace string, duName string) { - defer r.closeDataPath(ctx, duName) + defer func() { + go r.closeDataPath(ctx, duName) + }() log := r.logger.WithField("dataupload", duName)