Skip to content

Commit

Permalink
data mover ms node agent resume
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <[email protected]>
  • Loading branch information
Lyndon-Li committed Aug 5, 2024
1 parent d4e743b commit 89e72d0
Show file tree
Hide file tree
Showing 7 changed files with 695 additions and 471 deletions.
25 changes: 25 additions & 0 deletions pkg/builder/data_download_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package builder
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/vmware-tanzu/velero/pkg/apis/velero/shared"
velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1"
)

Expand Down Expand Up @@ -116,3 +117,27 @@ func (d *DataDownloadBuilder) StartTimestamp(startTime *metav1.Time) *DataDownlo
d.object.Status.StartTimestamp = startTime
return d
}

// CompletionTimestamp sets the DataDownload's StartTimestamp.
func (d *DataDownloadBuilder) CompletionTimestamp(completionTimestamp *metav1.Time) *DataDownloadBuilder {
d.object.Status.CompletionTimestamp = completionTimestamp
return d
}

// Labels sets the DataDownload's Labels.
func (d *DataDownloadBuilder) Labels(labels map[string]string) *DataDownloadBuilder {
d.object.Labels = labels
return d
}

// Labels sets the DataDownload's Progress.
func (d *DataDownloadBuilder) Progress(progress shared.DataMoveOperationProgress) *DataDownloadBuilder {
d.object.Status.Progress = progress
return d
}

// Node sets the DataDownload's Node.
func (d *DataDownloadBuilder) Node(node string) *DataDownloadBuilder {
d.object.Status.Node = node
return d
}
7 changes: 7 additions & 0 deletions pkg/builder/data_upload_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,14 @@ func (d *DataUploadBuilder) Labels(labels map[string]string) *DataUploadBuilder
return d
}

// Labels sets the DataUpload's Progress.
func (d *DataUploadBuilder) Progress(progress shared.DataMoveOperationProgress) *DataUploadBuilder {
d.object.Status.Progress = progress
return d
}

// Node sets the DataUpload's Node.
func (d *DataUploadBuilder) Node(node string) *DataUploadBuilder {
d.object.Status.Node = node
return d
}
43 changes: 14 additions & 29 deletions pkg/cmd/cli/nodeagent/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,18 +292,28 @@ func (s *nodeAgentServer) run() {
if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 {
loadAffinity = s.dataPathConfigs.LoadAffinity[0]
}
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.attemptDataUploadResume(dataUploadReconciler)
dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data upload controller")
}

dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
s.attemptDataDownloadResume(dataDownloadReconciler)
dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics)
if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil {
s.logger.WithError(err).Fatal("Unable to create the data download controller")
}

go func() {
s.mgr.GetCache().WaitForCacheSync(s.ctx)

if err := dataUploadReconciler.AttemptDataUploadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume")
}

if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume")
}
}()

s.logger.Info("Controllers starting...")

if err := s.mgr.Start(ctrl.SetupSignalHandler()); err != nil {
Expand Down Expand Up @@ -373,31 +383,6 @@ func (s *nodeAgentServer) markInProgressCRsFailed() {
s.markInProgressPVRsFailed(client)
}

func (s *nodeAgentServer) attemptDataUploadResume(r *controller.DataUploadReconciler) {
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
if err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
return
}
if err := r.AttemptDataUploadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume")
}
}

func (s *nodeAgentServer) attemptDataDownloadResume(r *controller.DataDownloadReconciler) {
// the function is called before starting the controller manager, the embedded client isn't ready to use, so create a new one here
client, err := ctrlclient.New(s.mgr.GetConfig(), ctrlclient.Options{Scheme: s.mgr.GetScheme()})
if err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to create client")
return
}

if err := r.AttemptDataDownloadResume(s.ctx, client, s.logger.WithField("node", s.nodeName), s.namespace); err != nil {
s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume")
}
}

func (s *nodeAgentServer) markInProgressPVBsFailed(client ctrlclient.Client) {
pvbs := &velerov1api.PodVolumeBackupList{}
if err := client.List(s.ctx, pvbs, &ctrlclient.ListOptions{Namespace: s.namespace}); err != nil {
Expand Down
214 changes: 114 additions & 100 deletions pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

Expand All @@ -56,6 +57,7 @@ import (
type DataDownloadReconciler struct {
client client.Client
kubeClient kubernetes.Interface
mgr manager.Manager
logger logrus.FieldLogger
credentialGetter *credentials.CredentialGetter
fileSystem filesystem.Interface
Expand All @@ -68,11 +70,12 @@ type DataDownloadReconciler struct {
metrics *metrics.ServerMetrics
}

func NewDataDownloadReconciler(client client.Client, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager,
repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler {
return &DataDownloadReconciler{
client: client,
kubeClient: kubeClient,
mgr: mgr,
logger: logger.WithField("controller", "DataDownload"),
credentialGetter: credentialGetter,
fileSystem: filesystem.NewFileSystem(),
Expand Down Expand Up @@ -575,75 +578,6 @@ func (r *DataDownloadReconciler) findSnapshotRestoreForPod(ctx context.Context,
return []reconcile.Request{request}
}

func (r *DataDownloadReconciler) FindDataDownloads(ctx context.Context, cli client.Client, ns string) ([]*velerov2alpha1api.DataDownload, error) {
pods := &v1.PodList{}
var dataDownloads []*velerov2alpha1api.DataDownload
if err := cli.List(ctx, pods, &client.ListOptions{Namespace: ns}); err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to list pods on current node")
return nil, errors.Wrapf(err, "failed to list pods on current node")
}

for _, pod := range pods.Items {
if pod.Spec.NodeName != r.nodeName {
r.logger.Debugf("Pod %s related data download will not handled by %s nodes", pod.GetName(), r.nodeName)
continue
}
dd, err := findDataDownloadByPod(cli, pod)
if err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to get dataDownload by pod")
continue
} else if dd != nil {
dataDownloads = append(dataDownloads, dd)
}
}
return dataDownloads, nil
}

func (r *DataDownloadReconciler) findAcceptDataDownloadsByNodeLabel(ctx context.Context, cli client.Client, ns string) ([]velerov2alpha1api.DataDownload, error) {
dataDownloads := &velerov2alpha1api.DataDownloadList{}
if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads")
return nil, errors.Wrapf(err, "failed to list datauploads")
}

var result []velerov2alpha1api.DataDownload
for _, dd := range dataDownloads.Items {
if dd.Status.Phase != velerov2alpha1api.DataDownloadPhaseAccepted {
continue
}
if dd.Labels[acceptNodeLabelKey] == r.nodeName {
result = append(result, dd)
}
}
return result, nil
}

// CancelAcceptedDataDownload will cancel the accepted data download
func (r *DataDownloadReconciler) CancelAcceptedDataDownload(ctx context.Context, cli client.Client, ns string) {
r.logger.Infof("Canceling accepted data for node %s", r.nodeName)
dataDownloads, err := r.findAcceptDataDownloadsByNodeLabel(ctx, cli, ns)
if err != nil {
r.logger.WithError(err).Error("failed to find data downloads")
return
}

for _, dd := range dataDownloads {
if dd.Spec.Cancel {
continue
}
err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name},
r.logger.WithField("dataupload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
})

r.logger.Warn(dd.Status.Message)
if err != nil {
r.logger.WithError(err).Errorf("failed to set cancel flag with error %s", err.Error())
}
}
}

func (r *DataDownloadReconciler) prepareDataDownload(ssb *velerov2alpha1api.DataDownload) {
ssb.Status.Phase = velerov2alpha1api.DataDownloadPhasePrepared
ssb.Status.Node = r.nodeName
Expand Down Expand Up @@ -796,55 +730,135 @@ func isDataDownloadInFinalState(dd *velerov2alpha1api.DataDownload) bool {
}

func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, namespacedName types.NamespacedName, log *logrus.Entry, updateFunc func(dataDownload *velerov2alpha1api.DataDownload)) error {
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (done bool, err error) {
return wait.PollUntilContextCancel(ctx, time.Second, true, func(ctx context.Context) (bool, error) {
dd := &velerov2alpha1api.DataDownload{}
if err := client.Get(ctx, namespacedName, dd); err != nil {
return false, errors.Wrap(err, "getting DataDownload")
}

updateFunc(dd)
updateErr := client.Update(ctx, dd)
if updateErr != nil {
if apierrors.IsConflict(updateErr) {
err := client.Update(ctx, dd)
if err != nil {
if apierrors.IsConflict(err) {
log.Warnf("failed to update datadownload for %s/%s and will retry it", dd.Namespace, dd.Name)
return false, nil
} else {
return false, errors.Wrapf(err, "error updating datadownload %s/%s", dd.Namespace, dd.Name)
}
log.Errorf("failed to update datadownload with error %s for %s/%s", updateErr.Error(), dd.Namespace, dd.Name)
return false, err
}

return true, nil
})
}

var funcResumeCancellableDataRestore = (*DataDownloadReconciler).resumeCancellableDataPath

func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error {
if dataDownloads, err := r.FindDataDownloads(ctx, cli, ns); err != nil {
return errors.Wrapf(err, "failed to find data downloads")
} else {
for i := range dataDownloads {
dd := dataDownloads[i]
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
// keep doing nothing let controller re-download the data
// the Prepared CR could be still handled by datadownload controller after node-agent restart
logger.WithField("datadownload", dd.GetName()).Debug("find a datadownload with status prepared")
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
})

if err != nil {
logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q into canceled", dd.GetName())
continue
}
logger.WithField("datadownload", dd.GetName()).Debug("mark datadownload into canceled")
dataDownloads := &velerov2alpha1api.DataDownloadList{}
if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil {
r.logger.WithError(errors.WithStack(err)).Error("failed to list datadownloads")
return errors.Wrapf(err, "error to list datadownloads")
}

for i := range dataDownloads.Items {
dd := &dataDownloads.Items[i]
if dd.Status.Phase == velerov2alpha1api.DataDownloadPhasePrepared {
// keep doing nothing let controller re-download the data
// the Prepared CR could be still handled by datadownload controller after node-agent restart
logger.WithField("datadownload", dd.GetName()).Debug("find a datadownload with status prepared")
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress {
if dd.Status.Node != r.nodeName {
logger.WithField("dd", dd.Name).WithField("current node", r.nodeName).Infof("DD should be resumed by another node %s", dd.Status.Node)
continue
}

err := funcResumeCancellableDataRestore(r, ctx, dd, logger)
if err == nil {
continue
}

logger.WithField("datadownload", dd.GetName()).WithError(err).Warn("Failed to resume data path for dd, have to cancel it")

err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name),
func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
})

if err != nil {
logger.WithError(errors.WithStack(err)).Errorf("failed to mark datadownload %q into canceled", dd.GetName())
continue
}
logger.WithField("datadownload", dd.GetName()).Debug("mark datadownload into canceled")
} else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted {
if dd.Labels[acceptNodeLabelKey] != r.nodeName {
continue
}

if dd.Spec.Cancel {
continue
}

err := UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name},
r.logger.WithField("datadownload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) {
dataDownload.Spec.Cancel = true
dataDownload.Status.Message = fmt.Sprintf("found a datadownload with status %q during the node-agent starting, mark it as cancel", dd.Status.Phase)
})

r.logger.Warn(dd.Status.Message)
if err != nil {
r.logger.WithError(err).Errorf("failed to set cancel flag with error %s", err.Error())
}
}
}

//If the data download is in Accepted status, the expoded PVC may be not created
// so we need to mark the data download as canceled for it may not be recoverable
r.CancelAcceptedDataDownload(ctx, cli, ns)
return nil
}

func (r *DataDownloadReconciler) resumeCancellableDataPath(ctx context.Context, dd *velerov2alpha1api.DataDownload, log logrus.FieldLogger) error {
log.Info("Resume cancelable dataDownload")

res, err := r.restoreExposer.GetExposed(ctx, getDataDownloadOwnerObject(dd), r.client, r.nodeName, dd.Spec.OperationTimeout.Duration)
if err != nil {
return errors.Wrapf(err, "error to get exposed volume for dd %s", dd.Name)
}

if res == nil {
return errors.Errorf("expose info missed for dd %s", dd.Name)
}

callbacks := datapath.Callbacks{
OnCompleted: r.OnDataDownloadCompleted,
OnFailed: r.OnDataDownloadFailed,
OnCancelled: r.OnDataDownloadCancelled,
OnProgress: r.OnDataDownloadProgress,
}

asyncBR, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, r.kubeClient, r.mgr, datapath.TaskTypeBackup, dd.Name, dd.Namespace, res.ByPod.HostingPod.Name, res.ByPod.HostingContainer, dd.Name, callbacks, true, log)
if err != nil {
return errors.Wrapf(err, "error to create asyncBR watcher for dd %s", dd.Name)
}

resumeComplete := false
defer func() {
if !resumeComplete {
r.closeDataPath(ctx, dd.Name)
}
}()

if err := asyncBR.Init(ctx, nil); err != nil {
return errors.Wrapf(err, "error to init asyncBR watcher for dd %s", dd.Name)
}

if err := asyncBR.StartRestore(dd.Spec.SnapshotID, datapath.AccessPoint{
ByPath: res.ByPod.VolumeName,
}, nil); err != nil {
return errors.Wrapf(err, "error to resume asyncBR watche for dd %s", dd.Name)
}

resumeComplete = true

log.Infof("asyncBR is resumed for dd %s", dd.Name)

return nil
}
Loading

0 comments on commit 89e72d0

Please sign in to comment.