Skip to content

Commit

Permalink
data mover ms smoke testing
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <[email protected]>
  • Loading branch information
Lyndon-Li committed Aug 15, 2024
1 parent d25a908 commit 0ed1a7f
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 15 deletions.
38 changes: 33 additions & 5 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ import (
"github.com/vmware-tanzu/velero/pkg/repository"
"github.com/vmware-tanzu/velero/pkg/util/filesystem"
"github.com/vmware-tanzu/velero/pkg/util/logging"

cacheutil "k8s.io/client-go/tools/cache"
)

var (
Expand Down Expand Up @@ -309,14 +311,17 @@ func (s *nodeAgentServer) run() {
}

go func() {
s.mgr.GetCache().WaitForCacheSync(s.ctx)
if err := s.waitCacheForResume(); err != nil {
s.logger.WithError(err).Error("Failed to wait cache for resume, will not resume DU/DD")
return

Check warning on line 316 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L314-L316

Added lines #L314 - L316 were not covered by tests
}

if err := dataUploadReconciler.AttemptDataUploadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume")
if err := dataUploadReconciler.AttemptDataUploadResume(s.ctx, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("Failed to attempt data upload resume")

Check warning on line 320 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L319-L320

Added lines #L319 - L320 were not covered by tests
}

if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume")
if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("Failed to attempt data download resume")

Check warning on line 324 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L323-L324

Added lines #L323 - L324 were not covered by tests
}
}()

Expand All @@ -327,6 +332,29 @@ func (s *nodeAgentServer) run() {
}
}

func (s *nodeAgentServer) waitCacheForResume() error {
podInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &v1.Pod{})
if err != nil {
return errors.Wrap(err, "error getting pod informer")

Check warning on line 338 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L335-L338

Added lines #L335 - L338 were not covered by tests
}

duInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &velerov2alpha1api.DataUpload{})
if err != nil {
return errors.Wrap(err, "error getting du informer")

Check warning on line 343 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L341-L343

Added lines #L341 - L343 were not covered by tests
}

ddInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &velerov2alpha1api.DataDownload{})
if err != nil {
return errors.Wrap(err, "error getting dd informer")

Check warning on line 348 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L346-L348

Added lines #L346 - L348 were not covered by tests
}

if !cacheutil.WaitForCacheSync(s.ctx.Done(), podInformer.HasSynced, duInformer.HasSynced, ddInformer.HasSynced) {
return errors.New("error waiting informer synced")

Check warning on line 352 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L351-L352

Added lines #L351 - L352 were not covered by tests
}

return nil

Check warning on line 355 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L355

Added line #L355 was not covered by tests
}

// validatePodVolumesHostPath validates that the pod volumes path contains a
// directory for each Pod running on this node
func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface) error {
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,9 +767,9 @@ func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, name

var funcResumeCancellableDataRestore = (*DataDownloadReconciler).resumeCancellableDataPath

func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error {
func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, logger *logrus.Entry, ns string) error {
dataDownloads := &velerov2alpha1api.DataDownloadList{}
if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil {
if err := r.client.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to list datadownloads")
return errors.Wrapf(err, "error to list datadownloads")
}
Expand All @@ -795,7 +795,7 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context,
logger.WithField("datadownload", dd.GetName()).WithError(err).Warn("Failed to resume data path for dd, have to cancel it")

resumeErr := err
err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name),
err = UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) bool {
if dataDownload.Spec.Cancel {
return false
Expand All @@ -812,7 +812,7 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context,
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
r.logger.WithField("datadownload", dd.GetName()).Warn("Cancel dd under Accepted phase")

err := UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name},
err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name},
r.logger.WithField("datadownload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) bool {
if dataDownload.Spec.Cancel {
return false
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/data_download_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,7 @@ func TestAttemptDataDownloadResume(t *testing.T) {
funcResumeCancellableDataBackup = dt.resumeCancellableDataPath

// Run the test
err = r.AttemptDataDownloadResume(ctx, r.client, r.logger.WithField("name", test.name), test.dd.Namespace)
err = r.AttemptDataDownloadResume(ctx, r.logger.WithField("name", test.name), test.dd.Namespace)

if test.expectedError != "" {
assert.EqualError(t, err, test.expectedError)
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,9 +867,9 @@ func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namesp

var funcResumeCancellableDataBackup = (*DataUploadReconciler).resumeCancellableDataPath

func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error {
func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, logger *logrus.Entry, ns string) error {
dataUploads := &velerov2alpha1api.DataUploadList{}
if err := cli.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil {
if err := r.client.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads")
return errors.Wrapf(err, "error to list datauploads")
}
Expand All @@ -895,7 +895,7 @@ func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli
logger.WithField("dataupload", du.GetName()).WithError(err).Warn("Failed to resume data path for du, have to cancel it")

resumeErr := err
err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name),
err = UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) bool {
if dataUpload.Spec.Cancel {
return false
Expand All @@ -912,7 +912,7 @@ func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli
} else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted {
r.logger.WithField("dataupload", du.GetName()).Warn("Cancel du under Accepted phase")

err := UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name),
err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) bool {
if dataUpload.Spec.Cancel {
return false
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/data_upload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,7 @@ func TestAttemptDataUploadResume(t *testing.T) {
funcResumeCancellableDataBackup = dt.resumeCancellableDataPath

// Run the test
err = r.AttemptDataUploadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace)
err = r.AttemptDataUploadResume(ctx, r.logger.WithField("name", test.name), test.du.Namespace)

if test.expectedError != "" {
assert.EqualError(t, err, test.expectedError)
Expand Down

0 comments on commit 0ed1a7f

Please sign in to comment.