diff --git a/api/v1alpha1/step_types.go b/api/v1alpha1/step_types.go index 595de3ee..b4ad70bd 100644 --- a/api/v1alpha1/step_types.go +++ b/api/v1alpha1/step_types.go @@ -23,9 +23,10 @@ import ( "path/filepath" "strconv" + "k8s.io/apimachinery/pkg/util/intstr" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/pointer" ) @@ -110,6 +111,10 @@ func (in Step) GetPodSpec(req GetPodSpecReq) corev1.PodSpec { }, AllowPrivilegeEscalation: pointer.BoolPtr(false), } + priorityClassName := "" + if req.Replica == 0 { + priorityClassName = "lead-replica" + } return corev1.PodSpec{ Volumes: append(in.Spec.Volumes, volumes...), RestartPolicy: in.Spec.RestartPolicy, @@ -119,8 +124,9 @@ func (in Step) GetPodSpec(req GetPodSpecReq) corev1.PodSpec { RunAsNonRoot: pointer.BoolPtr(true), RunAsUser: pointer.Int64Ptr(9653), }, - Affinity: in.Spec.Affinity, - Tolerations: in.Spec.Tolerations, + PriorityClassName: priorityClassName, + Affinity: in.Spec.Affinity, + Tolerations: in.Spec.Tolerations, InitContainers: []corev1.Container{ { Name: CtrInit, diff --git a/api/v1alpha1/step_types_test.go b/api/v1alpha1/step_types_test.go index 5a47030d..db521951 100644 --- a/api/v1alpha1/step_types_test.go +++ b/api/v1alpha1/step_types_test.go @@ -2,6 +2,7 @@ package v1alpha1 import ( "encoding/json" + "fmt" "testing" "time" @@ -13,131 +14,136 @@ import ( ) func TestStep_GetPodSpec(t *testing.T) { - env := []corev1.EnvVar{ - {Name: "ARGO_DATAFLOW_CLUSTER_NAME", Value: "my-cluster"}, - {Name: "ARGO_DATAFLOW_DEBUG", Value: "false"}, - {Name: "ARGO_DATAFLOW_NAMESPACE", Value: "my-ns"}, - {Name: "ARGO_DATAFLOW_PIPELINE_NAME", Value: "my-pl"}, - {Name: "ARGO_DATAFLOW_REPLICA", Value: "1"}, - {Name: "ARGO_DATAFLOW_STEP", Value: `{"metadata":{"creationTimestamp":null},"spec":{"name":"main","cat":{},"scale":{},"sidecar":{"resources":{}}},"status":{"phase":"","replicas":0,"lastScaledAt":null}}`}, - {Name: "ARGO_DATAFLOW_UPDATE_INTERVAL", Value: "1m0s"}, - {Name: "GODEBUG"}, - } - mounts := []corev1.VolumeMount{{Name: "var-run-argo-dataflow", MountPath: "/var/run/argo-dataflow"}} - dropAll := &corev1.SecurityContext{ - Capabilities: &corev1.Capabilities{ - Drop: []corev1.Capability{"all"}, - }, - AllowPrivilegeEscalation: pointer.BoolPtr(false), - } - tests := []struct { - name string - step Step - req GetPodSpecReq - want corev1.PodSpec - }{ - { - "Cat", - Step{ - Spec: StepSpec{ - Name: "main", - Cat: &Cat{}, + for replica, priorityClassName := range map[int]string{0: "lead-replica", 1: ""} { + t.Run(fmt.Sprintf("Replica%d", replica), func(t *testing.T) { + env := []corev1.EnvVar{ + {Name: "ARGO_DATAFLOW_CLUSTER_NAME", Value: "my-cluster"}, + {Name: "ARGO_DATAFLOW_DEBUG", Value: "false"}, + {Name: "ARGO_DATAFLOW_NAMESPACE", Value: "my-ns"}, + {Name: "ARGO_DATAFLOW_PIPELINE_NAME", Value: "my-pl"}, + {Name: "ARGO_DATAFLOW_REPLICA", Value: fmt.Sprintf("%d", replica)}, + {Name: "ARGO_DATAFLOW_STEP", Value: `{"metadata":{"creationTimestamp":null},"spec":{"name":"main","cat":{},"scale":{},"sidecar":{"resources":{}}},"status":{"phase":"","replicas":0,"lastScaledAt":null}}`}, + {Name: "ARGO_DATAFLOW_UPDATE_INTERVAL", Value: "1m0s"}, + {Name: "GODEBUG"}, + } + mounts := []corev1.VolumeMount{{Name: "var-run-argo-dataflow", MountPath: "/var/run/argo-dataflow"}} + dropAll := &corev1.SecurityContext{ + Capabilities: &corev1.Capabilities{ + Drop: []corev1.Capability{"all"}, }, - }, - GetPodSpecReq{ - ClusterName: "my-cluster", - ImageFormat: "image-%s", - Namespace: "my-ns", - PipelineName: "my-pl", - Replica: 1, - RunnerImage: "my-runner", - PullPolicy: corev1.PullAlways, - StepStatus: StepStatus{Phase: StepRunning}, - UpdateInterval: time.Minute, - Sidecar: Sidecar{Resources: standardResources}, - }, - corev1.PodSpec{ - Containers: []corev1.Container{ - { - Args: []string{"sidecar"}, - Env: env, - Image: "my-runner", - ImagePullPolicy: corev1.PullAlways, - Name: "sidecar", - Lifecycle: &corev1.Lifecycle{PreStop: &corev1.Handler{ - HTTPGet: &corev1.HTTPGetAction{ - Path: "/pre-stop?source=kubernetes", - Port: intstr.FromInt(3570), - Scheme: "HTTPS", - }, - }}, - Ports: []corev1.ContainerPort{{ContainerPort: 3570}}, - ReadinessProbe: &corev1.Probe{ - Handler: corev1.Handler{ - HTTPGet: &corev1.HTTPGetAction{Path: "/ready", Port: intstr.FromInt(3570), Scheme: "HTTPS"}, - }, + AllowPrivilegeEscalation: pointer.BoolPtr(false), + } + tests := []struct { + name string + step Step + req GetPodSpecReq + want corev1.PodSpec + }{ + { + "Cat", + Step{ + Spec: StepSpec{ + Name: "main", + Cat: &Cat{}, }, - Resources: standardResources, - SecurityContext: dropAll, - VolumeMounts: mounts, }, - { - Args: []string{"cat"}, - Image: "my-runner", - ImagePullPolicy: corev1.PullAlways, - Name: "main", - Lifecycle: &corev1.Lifecycle{PreStop: &corev1.Handler{ - Exec: &corev1.ExecAction{Command: []string{"/var/run/argo-dataflow/prestop"}}, - }}, - Resources: standardResources, - SecurityContext: dropAll, - VolumeMounts: mounts, + GetPodSpecReq{ + ClusterName: "my-cluster", + ImageFormat: "image-%s", + Namespace: "my-ns", + PipelineName: "my-pl", + Replica: int32(replica), + RunnerImage: "my-runner", + PullPolicy: corev1.PullAlways, + StepStatus: StepStatus{Phase: StepRunning}, + UpdateInterval: time.Minute, + Sidecar: Sidecar{Resources: standardResources}, }, - }, - InitContainers: []corev1.Container{ - { - Args: []string{"init"}, - Env: env, - Image: "my-runner", - ImagePullPolicy: corev1.PullAlways, - Name: "init", - Resources: standardResources, - SecurityContext: dropAll, - VolumeMounts: append(mounts, corev1.VolumeMount{ - Name: "ssh", - ReadOnly: true, - MountPath: "/.ssh", - }), - }, - }, - SecurityContext: &corev1.PodSecurityContext{ - RunAsUser: pointer.Int64Ptr(9653), - RunAsNonRoot: pointer.BoolPtr(true), - }, - Volumes: []corev1.Volume{ - { - Name: "var-run-argo-dataflow", - VolumeSource: corev1.VolumeSource{ - EmptyDir: &corev1.EmptyDirVolumeSource{}, + corev1.PodSpec{ + Containers: []corev1.Container{ + { + Args: []string{"sidecar"}, + Env: env, + Image: "my-runner", + ImagePullPolicy: corev1.PullAlways, + Name: "sidecar", + Lifecycle: &corev1.Lifecycle{PreStop: &corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/pre-stop?source=kubernetes", + Port: intstr.FromInt(3570), + Scheme: "HTTPS", + }, + }}, + Ports: []corev1.ContainerPort{{ContainerPort: 3570}}, + ReadinessProbe: &corev1.Probe{ + Handler: corev1.Handler{ + HTTPGet: &corev1.HTTPGetAction{Path: "/ready", Port: intstr.FromInt(3570), Scheme: "HTTPS"}, + }, + }, + Resources: standardResources, + SecurityContext: dropAll, + VolumeMounts: mounts, + }, + { + Args: []string{"cat"}, + Image: "my-runner", + ImagePullPolicy: corev1.PullAlways, + Name: "main", + Lifecycle: &corev1.Lifecycle{PreStop: &corev1.Handler{ + Exec: &corev1.ExecAction{Command: []string{"/var/run/argo-dataflow/prestop"}}, + }}, + Resources: standardResources, + SecurityContext: dropAll, + VolumeMounts: mounts, + }, }, - }, { - Name: "ssh", - VolumeSource: corev1.VolumeSource{ - Secret: &corev1.SecretVolumeSource{ - SecretName: "ssh", - DefaultMode: pointer.Int32Ptr(0o644), + InitContainers: []corev1.Container{ + { + Args: []string{"init"}, + Env: env, + Image: "my-runner", + ImagePullPolicy: corev1.PullAlways, + Name: "init", + Resources: standardResources, + SecurityContext: dropAll, + VolumeMounts: append(mounts, corev1.VolumeMount{ + Name: "ssh", + ReadOnly: true, + MountPath: "/.ssh", + }), + }, + }, + PriorityClassName: priorityClassName, + SecurityContext: &corev1.PodSecurityContext{ + RunAsUser: pointer.Int64Ptr(9653), + RunAsNonRoot: pointer.BoolPtr(true), + }, + Volumes: []corev1.Volume{ + { + Name: "var-run-argo-dataflow", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, { + Name: "ssh", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: "ssh", + DefaultMode: pointer.Int32Ptr(0o644), + }, + }, }, }, }, }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, _ := json.MarshalIndent(tt.step.GetPodSpec(tt.req), "", " ") - want, _ := json.MarshalIndent(tt.want, "", " ") - assert.Equal(t, string(want), string(got)) + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, _ := json.MarshalIndent(tt.step.GetPodSpec(tt.req), "", " ") + want, _ := json.MarshalIndent(tt.want, "", " ") + assert.Equal(t, string(want), string(got)) + }) + } }) } } diff --git a/config/ci.yaml b/config/ci.yaml index 3b608133..98252f22 100644 --- a/config/ci.yaml +++ b/config/ci.yaml @@ -13606,6 +13606,14 @@ spec: selector: app: testapi --- +apiVersion: scheduling.k8s.io/v1 +description: This priority class is used to ensure that lead replicas are prioritized + over other replicas. +kind: PriorityClass +metadata: + name: lead-replica +value: 1 +--- apiVersion: apps/v1 kind: Deployment metadata: diff --git a/config/default.yaml b/config/default.yaml index 8a7cb29b..3ad01092 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -13590,6 +13590,14 @@ spec: selector: control-plane: controller-manager --- +apiVersion: scheduling.k8s.io/v1 +description: This priority class is used to ensure that lead replicas are prioritized + over other replicas. +kind: PriorityClass +metadata: + name: lead-replica +value: 1 +--- apiVersion: apps/v1 kind: Deployment metadata: diff --git a/config/default/kustomization.yaml b/config/default/kustomization.yaml index 40f4225e..f835e132 100644 --- a/config/default/kustomization.yaml +++ b/config/default/kustomization.yaml @@ -18,6 +18,7 @@ bases: resources: - ssh-configmap.yaml + - lead-replica-priorityclass.yaml patchesStrategicMerge: # Protect the /metrics endpoint by putting it behind auth. diff --git a/config/default/lead-replica-priorityclass.yaml b/config/default/lead-replica-priorityclass.yaml new file mode 100644 index 00000000..6c1c36d9 --- /dev/null +++ b/config/default/lead-replica-priorityclass.yaml @@ -0,0 +1,6 @@ +apiVersion: scheduling.k8s.io/v1 +kind: PriorityClass +metadata: + name: lead-replica +value: 1 +description: "This priority class is used to ensure that lead replicas are prioritized over other replicas." \ No newline at end of file diff --git a/config/dev.yaml b/config/dev.yaml index 0cbfc1a1..8d02b973 100644 --- a/config/dev.yaml +++ b/config/dev.yaml @@ -13606,6 +13606,14 @@ spec: selector: app: testapi --- +apiVersion: scheduling.k8s.io/v1 +description: This priority class is used to ensure that lead replicas are prioritized + over other replicas. +kind: PriorityClass +metadata: + name: lead-replica +value: 1 +--- apiVersion: apps/v1 kind: Deployment metadata: diff --git a/config/quick-start.yaml b/config/quick-start.yaml index 8a7cb29b..3ad01092 100644 --- a/config/quick-start.yaml +++ b/config/quick-start.yaml @@ -13590,6 +13590,14 @@ spec: selector: control-plane: controller-manager --- +apiVersion: scheduling.k8s.io/v1 +description: This priority class is used to ensure that lead replicas are prioritized + over other replicas. +kind: PriorityClass +metadata: + name: lead-replica +value: 1 +--- apiVersion: apps/v1 kind: Deployment metadata: