Skip to content

Commit

Permalink
data mover backup for Windows nodes
Browse files Browse the repository at this point in the history
Signed-off-by: Lyndon-Li <[email protected]>
  • Loading branch information
Lyndon-Li committed Dec 24, 2024
1 parent 78c97d9 commit 7a3bde1
Show file tree
Hide file tree
Showing 13 changed files with 337 additions and 33 deletions.
19 changes: 18 additions & 1 deletion pkg/cmd/cli/datamover/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,24 @@ func newdataMoverBackup(logger logrus.FieldLogger, factory client.Factory, confi
return nil, errors.Wrap(err, "error to create client")
}

cache, err := ctlcache.New(clientConfig, cacheOption)
var cache ctlcache.Cache
retry := 10
for {
cache, err = ctlcache.New(clientConfig, cacheOption)
if err == nil {
break
}

retry--
if retry == 0 {
break
}

logger.WithError(err).Warn("Failed to create client cache, need retry")

time.Sleep(time.Second)
}

if err != nil {
cancelFunc()
return nil, errors.Wrap(err, "error to create client cache")
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/data_download_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request

hostingPodLabels := map[string]string{velerov1api.DataDownloadLabel: dd.Name}
for _, k := range util.ThirdPartyLabels {
if v, err := nodeagent.GetLabelValue(ctx, r.kubeClient, dd.Namespace, k); err != nil {
if v, err := nodeagent.GetLabelValue(ctx, r.kubeClient, dd.Namespace, k, kube.NodeOSLinux); err != nil {
if err != nodeagent.ErrNodeAgentLabelNotFound {
log.WithError(err).Warnf("Failed to check node-agent label, skip adding host pod label %s", k)
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/controller/data_upload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,14 +803,19 @@ func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload
return nil, errors.Wrapf(err, "failed to get PVC %s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC)
}

nodeOS, err := kube.GetPVCAttachingNodeOS(pvc, r.kubeClient.CoreV1(), r.kubeClient.StorageV1(), r.logger)
if err != nil {
return nil, errors.Wrapf(err, "failed to get attaching node OS for PVC %s/%s", du.Spec.SourceNamespace, du.Spec.SourcePVC)
}

accessMode := exposer.AccessModeFileSystem
if pvc.Spec.VolumeMode != nil && *pvc.Spec.VolumeMode == corev1.PersistentVolumeBlock {
accessMode = exposer.AccessModeBlock
}

hostingPodLabels := map[string]string{velerov1api.DataUploadLabel: du.Name}
for _, k := range util.ThirdPartyLabels {
if v, err := nodeagent.GetLabelValue(context.Background(), r.kubeClient, du.Namespace, k); err != nil {
if v, err := nodeagent.GetLabelValue(context.Background(), r.kubeClient, du.Namespace, k, nodeOS); err != nil {
if err != nodeagent.ErrNodeAgentLabelNotFound {
r.logger.WithError(err).Warnf("Failed to check node-agent label, skip adding host pod label %s", k)
}
Expand All @@ -831,6 +836,7 @@ func (r *DataUploadReconciler) setupExposeParam(du *velerov2alpha1api.DataUpload
Affinity: r.loadAffinity,
BackupPVCConfig: r.backupPVCConfig,
Resources: r.podResources,
NodeOS: nodeOS,
}, nil
}
return nil, nil
Expand Down
52 changes: 39 additions & 13 deletions pkg/exposer/csi_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ type CSISnapshotExposeParam struct {

// Resources defines the resource requirements of the hosting pod
Resources corev1.ResourceRequirements

// NodeOS specifies the OS of node that the source volume is attaching
NodeOS string
}

// CSISnapshotExposeWaitParam define the input param for WaitExposed of CSI snapshots
Expand Down Expand Up @@ -212,6 +215,7 @@ func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.Obje
csiExposeParam.Resources,
backupPVCReadOnly,
spcNoRelabeling,
csiExposeParam.NodeOS,
)
if err != nil {
return errors.Wrap(err, "error to create backup pod")
Expand Down Expand Up @@ -517,13 +521,14 @@ func (e *csiSnapshotExposer) createBackupPod(
resources corev1.ResourceRequirements,
backupPVCReadOnly bool,
spcNoRelabeling bool,
nodeOS string,
) (*corev1.Pod, error) {
podName := ownerObject.Name

containerName := string(ownerObject.UID)
volumeName := string(ownerObject.UID)

podInfo, err := getInheritedPodInfo(ctx, e.kubeClient, ownerObject.Namespace)
podInfo, err := getInheritedPodInfo(ctx, e.kubeClient, ownerObject.Namespace, nodeOS)
if err != nil {
return nil, errors.Wrap(err, "error to get inherited pod info from node-agent")
}
Expand Down Expand Up @@ -567,13 +572,40 @@ func (e *csiSnapshotExposer) createBackupPod(
args = append(args, podInfo.logFormatArgs...)
args = append(args, podInfo.logLevelArgs...)

userID := int64(0)

affinityList := make([]*kube.LoadAffinity, 0)
if affinity != nil {
affinityList = append(affinityList, affinity)
}

var securityCtx *corev1.PodSecurityContext
nodeSelector := map[string]string{}
podOS := corev1.PodOS{}
if nodeOS == kube.NodeOSWindows {
userID := "ContainerAdministrator"
securityCtx = &corev1.PodSecurityContext{
WindowsOptions: &corev1.WindowsSecurityContextOptions{
RunAsUserName: &userID,
},
}

nodeSelector[kube.NodeOSLabel] = kube.NodeOSWindows
podOS.Name = kube.NodeOSWindows
} else {
userID := int64(0)
securityCtx = &corev1.PodSecurityContext{
RunAsUser: &userID,
}

if spcNoRelabeling {
securityCtx.SELinuxOptions = &corev1.SELinuxOptions{
Type: "spc_t",
}
}

nodeSelector[kube.NodeOSLabel] = kube.NodeOSLinux
podOS.Name = kube.NodeOSLinux
}

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Expand Down Expand Up @@ -602,7 +634,9 @@ func (e *csiSnapshotExposer) createBackupPod(
},
},
},
Affinity: kube.ToSystemAffinity(affinityList),
NodeSelector: nodeSelector,
OS: &podOS,
Affinity: kube.ToSystemAffinity(affinityList),
Containers: []corev1.Container{
{
Name: containerName,
Expand All @@ -625,17 +659,9 @@ func (e *csiSnapshotExposer) createBackupPod(
TerminationGracePeriodSeconds: &gracePeriod,
Volumes: volumes,
RestartPolicy: corev1.RestartPolicyNever,
SecurityContext: &corev1.PodSecurityContext{
RunAsUser: &userID,
},
SecurityContext: securityCtx,
},
}

if spcNoRelabeling {
pod.Spec.SecurityContext.SELinuxOptions = &corev1.SELinuxOptions{
Type: "spc_t",
}
}

return e.kubeClient.CoreV1().Pods(ownerObject.Namespace).Create(ctx, pod, metav1.CreateOptions{})
}
2 changes: 1 addition & 1 deletion pkg/exposer/generic_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func (e *genericRestoreExposer) createRestorePod(ctx context.Context, ownerObjec
containerName := string(ownerObject.UID)
volumeName := string(ownerObject.UID)

podInfo, err := getInheritedPodInfo(ctx, e.kubeClient, ownerObject.Namespace)
podInfo, err := getInheritedPodInfo(ctx, e.kubeClient, ownerObject.Namespace, kube.NodeOSLinux)
if err != nil {
return nil, errors.Wrap(err, "error to get inherited pod info from node-agent")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/exposer/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ type inheritedPodInfo struct {
logFormatArgs []string
}

func getInheritedPodInfo(ctx context.Context, client kubernetes.Interface, veleroNamespace string) (inheritedPodInfo, error) {
func getInheritedPodInfo(ctx context.Context, client kubernetes.Interface, veleroNamespace string, osType string) (inheritedPodInfo, error) {
podInfo := inheritedPodInfo{}

podSpec, err := nodeagent.GetPodSpec(ctx, client, veleroNamespace)
podSpec, err := nodeagent.GetPodSpec(ctx, client, veleroNamespace, osType)
if err != nil {
return podInfo, errors.Wrap(err, "error to get node-agent pod template")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/exposer/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/vmware-tanzu/velero/pkg/util/kube"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -322,7 +323,7 @@ func TestGetInheritedPodInfo(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)
info, err := getInheritedPodInfo(context.Background(), fakeKubeClient, test.namespace)
info, err := getInheritedPodInfo(context.Background(), fakeKubeClient, test.namespace, kube.NodeOSLinux)

if test.expectErr == "" {
assert.NoError(t, err)
Expand Down
22 changes: 16 additions & 6 deletions pkg/nodeagent/node_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,15 @@ func isRunningInNode(ctx context.Context, namespace string, nodeName string, crC
return errors.Errorf("daemonset pod not found in running state in node %s", nodeName)
}

func GetPodSpec(ctx context.Context, kubeClient kubernetes.Interface, namespace string) (*v1.PodSpec, error) {
ds, err := kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, daemonSet, metav1.GetOptions{})
func GetPodSpec(ctx context.Context, kubeClient kubernetes.Interface, namespace string, osType string) (*v1.PodSpec, error) {
dsName := daemonSet
if osType == kube.NodeOSWindows {
dsName = daemonsetWindows
}

ds, err := kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, dsName, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrap(err, "error to get node-agent daemonset")
return nil, errors.Wrapf(err, "error to get %s daemonset", dsName)
}

return &ds.Spec.Template.Spec, nil
Expand Down Expand Up @@ -190,10 +195,15 @@ func GetConfigs(ctx context.Context, namespace string, kubeClient kubernetes.Int
return configs, nil
}

func GetLabelValue(ctx context.Context, kubeClient kubernetes.Interface, namespace string, key string) (string, error) {
ds, err := kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, daemonSet, metav1.GetOptions{})
func GetLabelValue(ctx context.Context, kubeClient kubernetes.Interface, namespace string, key string, osType string) (string, error) {
dsName := daemonSet
if osType == kube.NodeOSWindows {
dsName = daemonsetWindows
}

ds, err := kubeClient.AppsV1().DaemonSets(namespace).Get(ctx, dsName, metav1.GetOptions{})
if err != nil {
return "", errors.Wrap(err, "error getting node-agent daemonset")
return "", errors.Wrapf(err, "error getting %s daemonset", dsName)
}

if ds.Spec.Template.Labels == nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/nodeagent/node_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/vmware-tanzu/velero/pkg/builder"
"github.com/vmware-tanzu/velero/pkg/util/kube"
)

type reactor struct {
Expand Down Expand Up @@ -229,7 +230,7 @@ func TestGetPodSpec(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)

spec, err := GetPodSpec(context.TODO(), fakeKubeClient, test.namespace)
spec, err := GetPodSpec(context.TODO(), fakeKubeClient, test.namespace, kube.NodeOSLinux)
if test.expectErr == "" {
assert.NoError(t, err)
assert.Equal(t, *spec, test.expectSpec)
Expand Down Expand Up @@ -450,7 +451,7 @@ func TestGetLabelValue(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
fakeKubeClient := fake.NewSimpleClientset(test.kubeClientObj...)

value, err := GetLabelValue(context.TODO(), fakeKubeClient, test.namespace, "fake-label")
value, err := GetLabelValue(context.TODO(), fakeKubeClient, test.namespace, "fake-label", kube.NodeOSLinux)
if test.expectErr == "" {
assert.NoError(t, err)
assert.Equal(t, test.expectedValue, value)
Expand Down
31 changes: 26 additions & 5 deletions pkg/util/kube/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,43 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1api "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
NodeOSLinux = "linux"
NodeOSWindows = "windows"
NodeOSLabel = "kubernetes.io/os"
)

func IsLinuxNode(ctx context.Context, nodeName string, client client.Client) error {
node := &corev1api.Node{}
if err := client.Get(ctx, types.NamespacedName{Name: nodeName}, node); err != nil {
return errors.Wrapf(err, "error getting node %s", nodeName)
}

os, found := node.Labels["kubernetes.io/os"]
os, found := node.Labels[NodeOSLabel]

if !found {
return errors.Errorf("no os type label for node %s", nodeName)
}

if os != "linux" {
if os != NodeOSLinux {
return errors.Errorf("os type %s for node %s is not linux", os, nodeName)
}

return nil
}

func WithLinuxNode(ctx context.Context, client client.Client, log logrus.FieldLogger) bool {
return withOSNode(ctx, client, "linux", log)
return withOSNode(ctx, client, NodeOSLinux, log)
}

func WithWindowsNode(ctx context.Context, client client.Client, log logrus.FieldLogger) bool {
return withOSNode(ctx, client, "windows", log)
return withOSNode(ctx, client, NodeOSWindows, log)
}

func withOSNode(ctx context.Context, client client.Client, osType string, log logrus.FieldLogger) bool {
Expand All @@ -61,7 +69,7 @@ func withOSNode(ctx context.Context, client client.Client, osType string, log lo

allNodeLabeled := true
for _, node := range nodeList.Items {
os, found := node.Labels["kubernetes.io/os"]
os, found := node.Labels[NodeOSLabel]

if os == osType {
return true
Expand All @@ -78,3 +86,16 @@ func withOSNode(ctx context.Context, client client.Client, osType string, log lo

return false
}

func GetNodeOS(ctx context.Context, nodeName string, nodeClient corev1client.CoreV1Interface) (string, error) {
node, err := nodeClient.Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
if err != nil {
return "", errors.Wrapf(err, "error getting node %s", nodeName)
}

if node.Labels == nil {
return "", nil
}

return node.Labels[NodeOSLabel], nil
}
Loading

0 comments on commit 7a3bde1

Please sign in to comment.