Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data mover micro service new controller #8074

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/unreleased/8074-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Data mover micro service DUCR/DDCR controller refactor according to design #7576
6 changes: 6 additions & 0 deletions pkg/builder/data_download_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ func (d *DataDownloadBuilder) ObjectMeta(opts ...ObjectMetaOpt) *DataDownloadBui
return d
}

// Labels sets the DataDownload's Labels.
func (d *DataDownloadBuilder) Labels(labels map[string]string) *DataDownloadBuilder {
d.object.Labels = labels
return d
}

// StartTimestamp sets the DataDownload's StartTimestamp.
func (d *DataDownloadBuilder) StartTimestamp(startTime *metav1.Time) *DataDownloadBuilder {
d.object.Status.StartTimestamp = startTime
Expand Down
3 changes: 1 addition & 2 deletions pkg/cmd/cli/datamover/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@
}
}()

// TODOOO: call s.runDataPath()
time.Sleep(time.Duration(1<<63 - 1))
s.runDataPath()

Check warning on line 210 in pkg/cmd/cli/datamover/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/backup.go#L210

Added line #L210 was not covered by tests
}

func (s *dataMoverBackup) runDataPath() {
Expand Down
3 changes: 1 addition & 2 deletions pkg/cmd/cli/datamover/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@
}
}()

// TODOOO: call s.runDataPath()
time.Sleep(time.Duration(1<<63 - 1))
s.runDataPath()

Check warning on line 201 in pkg/cmd/cli/datamover/restore.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/datamover/restore.go#L201

Added line #L201 was not covered by tests
}

func (s *dataMoverRestore) runDataPath() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@
logLevel := logLevelFlag.Parse()
logrus.Infof("Setting log-level to %s", strings.ToUpper(logLevel.String()))

logger := logging.DefaultLogger(logLevel, formatFlag.Parse())
logger := logging.DefaultMergeLogger(logLevel, formatFlag.Parse())

Check warning on line 107 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L107

Added line #L107 was not covered by tests
logger.Infof("Starting Velero node-agent server %s (%s)", buildinfo.Version, buildinfo.FormattedGitSHA())

f.SetBasename(fmt.Sprintf("%s-%s", c.Parent().Name(), c.Name()))
Expand Down Expand Up @@ -292,13 +292,13 @@
if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 {
loadAffinity = s.dataPathConfigs.LoadAffinity[0]
}
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)

Check warning on line 295 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L295

Added line #L295 was not covered by tests
s.attemptDataUploadResume(dataUploadReconciler)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}

dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)

Check warning on line 301 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L301

Added line #L301 was not covered by tests
s.attemptDataDownloadResume(dataDownloadReconciler)
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
Expand Down
71 changes: 34 additions & 37 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand All @@ -56,6 +57,7 @@
type DataDownloadReconciler struct {
client client.Client
kubeClient kubernetes.Interface
mgr manager.Manager
logger logrus.FieldLogger
credentialGetter *credentials.CredentialGetter
fileSystem filesystem.Interface
Expand All @@ -68,11 +70,12 @@
metrics *metrics.ServerMetrics
}

func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
return &DataDownloadReconciler{
client: client,
kubeClient: kubeClient,
mgr: mgr,
logger: logger.WithField("controller", "DataDownload"),
credentialGetter: credentialGetter,
fileSystem: filesystem.NewFileSystem(),
Expand Down Expand Up @@ -234,9 +237,9 @@
return ctrl.Result{}, nil
}

fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name)
asyncBR := r.dataPathMgr.GetAsyncBR(dd.Name)

if fsRestore != nil {
if asyncBR != nil {
log.Info("Cancellable data path is already started")
return ctrl.Result{}, nil
}
Expand All @@ -259,7 +262,8 @@
OnProgress: r.OnDataDownloadProgress,
}

fsRestore, err = r.dataPathMgr.CreateFileSystemBR(dd.Name, dataUploadDownloadRequestor, ctx, r.client, dd.Namespace, callbacks, log)
asyncBR, err = r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeRestore,
dd.Name, dd.Namespace, result.ByPod.HostingPod.Name, result.ByPod.HostingContainer, dd.Name, callbacks, false, log)
if err != nil {
if err == datapath.ConcurrentLimitExceed {
log.Info("Data path instance is concurrent limited requeue later")
Expand All @@ -279,7 +283,7 @@

log.Info("Data download is marked as in progress")

reconcileResult, err := r.runCancelableDataPath(ctx, fsRestore, dd, result, log)
reconcileResult, err := r.runCancelableDataPath(ctx, asyncBR, dd, result, log)

Check warning on line 286 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L286

Added line #L286 was not covered by tests
if err != nil {
log.Errorf("Failed to run cancelable data path for %s with err %v", dd.Name, err)
r.closeDataPath(ctx, dd.Name)
Expand All @@ -289,8 +293,8 @@
log.Info("Data download is in progress")
if dd.Spec.Cancel {
log.Info("Data download is being canceled")
fsRestore := r.dataPathMgr.GetAsyncBR(dd.Name)
if fsRestore == nil {
asyncBR := r.dataPathMgr.GetAsyncBR(dd.Name)
if asyncBR == nil {
if r.nodeName == dd.Status.Node {
r.OnDataDownloadCancelled(ctx, dd.GetNamespace(), dd.GetName())
} else {
Expand All @@ -306,7 +310,7 @@
log.WithError(err).Error("error updating data download status")
return ctrl.Result{}, err
}
fsRestore.Cancel()
asyncBR.Cancel()
return ctrl.Result{}, nil
}

Expand All @@ -327,38 +331,27 @@
}
}

func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, fsRestore datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
path, err := exposer.GetPodVolumeHostPath(ctx, res.ByPod.HostingPod, res.ByPod.VolumeName, r.client, r.fileSystem, log)
if err != nil {
return r.errorOut(ctx, dd, err, "error exposing host path for pod volume", log)
}

log.WithField("path", path.ByPath).Debug("Found host path")

if err := fsRestore.Init(ctx, &datapath.FSBRInitParam{
BSLName: dd.Spec.BackupStorageLocation,
SourceNamespace: dd.Spec.SourceNamespace,
UploaderType: datamover.GetUploaderType(dd.Spec.DataMover),
RepositoryType: velerov1api.BackupRepositoryTypeKopia,
RepoIdentifier: "",
RepositoryEnsurer: r.repositoryEnsurer,
CredentialGetter: r.credentialGetter,
}); err != nil {
return r.errorOut(ctx, dd, err, "error to initialize data path", log)
func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) {
if err := asyncBR.Init(ctx, nil); err != nil {
return r.errorOut(ctx, dd, err, "error to initialize asyncBR", log)

Check warning on line 336 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L334-L336

Added lines #L334 - L336 were not covered by tests
}

log.WithField("path", path.ByPath).Info("fs init")
log.Infof("async restore init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName)

Check warning on line 339 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L339

Added line #L339 was not covered by tests

if err := fsRestore.StartRestore(dd.Spec.SnapshotID, path, dd.Spec.DataMoverConfig); err != nil {
return r.errorOut(ctx, dd, err, fmt.Sprintf("error starting data path %s restore", path.ByPath), log)
if err := asyncBR.StartRestore(dd.Spec.SnapshotID, datapath.AccessPoint{
ByPath: res.ByPod.VolumeName,
}, dd.Spec.DataMoverConfig); err != nil {
return r.errorOut(ctx, dd, err, fmt.Sprintf("error starting async restore for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log)

Check warning on line 344 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L341-L344

Added lines #L341 - L344 were not covered by tests
}

log.WithField("path", path.ByPath).Info("Async fs restore data path started")
log.Infof("Async restore started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName)

Check warning on line 347 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L347

Added line #L347 was not covered by tests
return ctrl.Result{}, nil
}

func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) {
defer r.closeDataPath(ctx, ddName)
defer func() {
go r.closeDataPath(ctx, ddName)
}()

log := r.logger.WithField("datadownload", ddName)
log.Info("Async fs restore data path completed")
Expand Down Expand Up @@ -391,7 +384,9 @@
}

func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) {
defer r.closeDataPath(ctx, ddName)
defer func() {
go r.closeDataPath(ctx, ddName)
}()

log := r.logger.WithField("datadownload", ddName)

Expand All @@ -408,7 +403,9 @@
}

func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) {
defer r.closeDataPath(ctx, ddName)
defer func() {
go r.closeDataPath(ctx, ddName)
}()

log := r.logger.WithField("datadownload", ddName)

Expand Down Expand Up @@ -561,7 +558,7 @@
log.WithError(err).Warn("failed to cancel datadownload, and it will wait for prepare timeout")
return []reconcile.Request{}
}
log.Info("Exposed pod is in abnormal status, and datadownload is marked as cancel")
log.Infof("Exposed pod is in abnormal status(reason %s) and datadownload is marked as cancel", reason)

Check warning on line 561 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L561

Added line #L561 was not covered by tests
} else {
return []reconcile.Request{}
}
Expand Down Expand Up @@ -754,9 +751,9 @@
}

func (r *DataDownloadReconciler) closeDataPath(ctx context.Context, ddName string) {
fsBackup := r.dataPathMgr.GetAsyncBR(ddName)
if fsBackup != nil {
fsBackup.Close(ctx)
asyncBR := r.dataPathMgr.GetAsyncBR(ddName)
if asyncBR != nil {
asyncBR.Close(ctx)

Check warning on line 756 in pkg/controller/data_download_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/data_download_controller.go#L756

Added line #L756 was not covered by tests
}

r.dataPathMgr.RemoveAsyncBR(ddName)
Expand Down
26 changes: 11 additions & 15 deletions pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
clientgofake "k8s.io/client-go/kubernetes/fake"
ctrl "sigs.k8s.io/controller-runtime"
kbclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -149,7 +151,7 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ...

dataPathMgr := datapath.NewManager(1)

return NewDataDownloadReconciler(fakeClient, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil
}

func TestDataDownloadReconcile(t *testing.T) {
Expand Down Expand Up @@ -261,14 +263,6 @@ func TestDataDownloadReconcile(t *testing.T) {
notMockCleanUp: true,
expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5},
},
{
name: "Error getting volume directory name for pvc in pod",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(),
notNilExpose: true,
mockClose: true,
expectedStatusMsg: "error identifying unique volume path on host",
},
{
name: "Unable to update status to in progress for data download",
dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(),
Expand Down Expand Up @@ -402,17 +396,18 @@ func TestDataDownloadReconcile(t *testing.T) {
r.dataPathMgr = datapath.NewManager(1)
}

datapath.FSBRCreator = func(string, string, kbclient.Client, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
fsBR := datapathmockes.NewAsyncBR(t)
datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string,
string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR {
asyncBR := datapathmockes.NewAsyncBR(t)
if test.mockCancel {
fsBR.On("Cancel").Return()
asyncBR.On("Cancel").Return()
}

if test.mockClose {
fsBR.On("Close", mock.Anything).Return()
asyncBR.On("Close", mock.Anything).Return()
}

return fsBR
return asyncBR
}

if test.isExposeErr || test.isGetExposeErr || test.isPeekExposeErr || test.isNilExposer || test.notNilExpose {
Expand Down Expand Up @@ -443,7 +438,8 @@ func TestDataDownloadReconcile(t *testing.T) {

if test.needCreateFSBR {
if fsBR := r.dataPathMgr.GetAsyncBR(test.dd.Name); fsBR == nil {
_, err := r.dataPathMgr.CreateFileSystemBR(test.dd.Name, pVBRRequestor, ctx, r.client, velerov1api.DefaultNamespace, datapath.Callbacks{OnCancelled: r.OnDataDownloadCancelled}, velerotest.NewLogger())
_, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, nil, nil, datapath.TaskTypeRestore, test.dd.Name, pVBRRequestor,
velerov1api.DefaultNamespace, "", "", datapath.Callbacks{OnCancelled: r.OnDataDownloadCancelled}, false, velerotest.NewLogger())
require.NoError(t, err)
}
}
Expand Down
Loading
Loading