Skip to content

Commit

Permalink
data mover ms node agent resume
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <[email protected]>
  • Loading branch information
Lyndon-Li committed Aug 6, 2024
1 parent d4e743b commit a523d10
Show file tree
Hide file tree
Showing 9 changed files with 799 additions and 502 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/8085-Lyndon-Li
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
According to design #7576, after node-agent restarts, if a DU/DD is in InProgress status, re-capture the data mover ms pod and continue the execution
25 changes: 25 additions & 0 deletions pkg/builder/data_download_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package builder
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
)

Expand Down Expand Up @@ -116,3 +117,27 @@ func (d *DataDownloadBuilder) StartTimestamp(startTime *metav1.Time) *DataDownlo
d.object.Status.StartTimestamp = startTime
return d
}

// CompletionTimestamp sets the DataDownload's StartTimestamp.
func (d *DataDownloadBuilder) CompletionTimestamp(completionTimestamp *metav1.Time) *DataDownloadBuilder {
d.object.Status.CompletionTimestamp = completionTimestamp
return d
}

// Labels sets the DataDownload's Labels.
func (d *DataDownloadBuilder) Labels(labels map[string]string) *DataDownloadBuilder {
d.object.Labels = labels
return d
}

// Labels sets the DataDownload's Progress.
func (d *DataDownloadBuilder) Progress(progress shared.DataMoveOperationProgress) *DataDownloadBuilder {
d.object.Status.Progress = progress
return d
}

// Node sets the DataDownload's Node.
func (d *DataDownloadBuilder) Node(node string) *DataDownloadBuilder {
d.object.Status.Node = node
return d
}
7 changes: 7 additions & 0 deletions pkg/builder/data_upload_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ func (d *DataUploadBuilder) Labels(labels map[string]string) *DataUploadBuilder
return d
}

// Labels sets the DataUpload's Progress.
func (d *DataUploadBuilder) Progress(progress shared.DataMoveOperationProgress) *DataUploadBuilder {
d.object.Status.Progress = progress
return d
}

// Node sets the DataUpload's Node.
func (d *DataUploadBuilder) Node(node string) *DataUploadBuilder {
d.object.Status.Node = node
return d
}
43 changes: 14 additions & 29 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,18 +292,28 @@ func (s *nodeAgentServer) run() {
if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 {
loadAffinity = s.dataPathConfigs.LoadAffinity[0]
}
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.attemptDataUploadResume(dataUploadReconciler)
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)

Check warning on line 295 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L295

Added line #L295 was not covered by tests
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}

dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.attemptDataDownloadResume(dataDownloadReconciler)
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)

Check warning on line 300 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L300

Added line #L300 was not covered by tests
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
}

go func() {
s.mgr.GetCache().WaitForCacheSync(s.ctx)

Check warning on line 306 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L305-L306

Added lines #L305 - L306 were not covered by tests

if err := dataUploadReconciler.AttemptDataUploadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume")

Check warning on line 309 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L308-L309

Added lines #L308 - L309 were not covered by tests
}

if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume")

Check warning on line 313 in pkg/cmd/cli/nodeagent/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/cli/nodeagent/server.go#L312-L313

Added lines #L312 - L313 were not covered by tests
}
}()

s.logger.Info("Controllers starting...")

if err := s.mgr.Start(ctrl.SetupSignalHandler()); err != nil {
Expand Down Expand Up @@ -373,31 +383,6 @@ func (s *nodeAgentServer) markInProgressCRsFailed() {
s.markInProgressPVRsFailed(client)
}

func (s *nodeAgentServer) attemptDataUploadResume(r *controller.DataUploadReconciler) {
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
if err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
return
}
if err := r.AttemptDataUploadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume")
}
}

func (s *nodeAgentServer) attemptDataDownloadResume(r *controller.DataDownloadReconciler) {
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
if err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
return
}

if err := r.AttemptDataDownloadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume")
}
}

func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) {
pvbs := &velerov1api.PodVolumeBackupList{}
if err := client.List(s.ctx, pvbs, &ctrlclient.ListOptions{Namespace: s.namespace}); err != nil {
Expand Down
20 changes: 16 additions & 4 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,9 +1148,15 @@ func markDataUploadsCancel(ctx context.Context, client ctrlclient.Client, backup
du.Status.Phase == velerov2alpha1api.DataUploadPhaseNew ||
du.Status.Phase == "" {
err := controller.UpdateDataUploadWithRetry(ctx, client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, log.WithField("dataupload", du.Name),
func(dataUpload *velerov2alpha1api.DataUpload) {
func(dataUpload *velerov2alpha1api.DataUpload) bool {
if dataUpload.Spec.Cancel {
return false

Check warning on line 1153 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L1151-L1153

Added lines #L1151 - L1153 were not covered by tests
}

dataUpload.Spec.Cancel = true
dataUpload.Status.Message = fmt.Sprintf("found a dataupload with status %q during the velero server starting, mark it as cancel", du.Status.Phase)
dataUpload.Status.Message = fmt.Sprintf("Dataupload is in status %q during the velero server starting, mark it as cancel", du.Status.Phase)

Check warning on line 1157 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L1157

Added line #L1157 was not covered by tests

return true

Check warning on line 1159 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L1159

Added line #L1159 was not covered by tests
})

if err != nil {
Expand Down Expand Up @@ -1183,9 +1189,15 @@ func markDataDownloadsCancel(ctx context.Context, client ctrlclient.Client, rest
dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseNew ||
dd.Status.Phase == "" {
err := controller.UpdateDataDownloadWithRetry(ctx, client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, log.WithField("datadownload", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) {
func(dataDownload *velerov2alpha1api.DataDownload) bool {
if dataDownload.Spec.Cancel {
return false

Check warning on line 1194 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L1192-L1194

Added lines #L1192 - L1194 were not covered by tests
}

dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the velero server starting, mark it as cancel", dd.Status.Phase)
dataDownload.Status.Message = fmt.Sprintf("Datadownload is in status %q during the velero server starting, mark it as cancel", dd.Status.Phase)

Check warning on line 1198 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L1198

Added line #L1198 was not covered by tests

return true

Check warning on line 1200 in pkg/cmd/server/server.go

View check run for this annotation

Codecov / codecov/patch

pkg/cmd/server/server.go#L1200

Added line #L1200 was not covered by tests
})

if err != nil {
Expand Down
Loading

0 comments on commit a523d10

Please sign in to comment.