Skip to content

Commit

Permalink
feat: Use pod priority class to prioritize the lead replica. Fixes #269
Browse files Browse the repository at this point in the history
… (#275)
  • Loading branch information
alexec authored Aug 23, 2021
1 parent 8bdfbf2 commit be2b63d
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 119 deletions.
12 changes: 9 additions & 3 deletions api/v1alpha1/step_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (
"path/filepath"
"strconv"

"k8s.io/apimachinery/pkg/util/intstr"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -110,6 +111,10 @@ func (in Step) GetPodSpec(req GetPodSpecReq) corev1.PodSpec {
},
AllowPrivilegeEscalation: pointer.BoolPtr(false),
}
priorityClassName := ""
if req.Replica == 0 {
priorityClassName = "lead-replica"
}
return corev1.PodSpec{
Volumes: append(in.Spec.Volumes, volumes...),
RestartPolicy: in.Spec.RestartPolicy,
Expand All @@ -119,8 +124,9 @@ func (in Step) GetPodSpec(req GetPodSpecReq) corev1.PodSpec {
RunAsNonRoot: pointer.BoolPtr(true),
RunAsUser: pointer.Int64Ptr(9653),
},
Affinity: in.Spec.Affinity,
Tolerations: in.Spec.Tolerations,
PriorityClassName: priorityClassName,
Affinity: in.Spec.Affinity,
Tolerations: in.Spec.Tolerations,
InitContainers: []corev1.Container{
{
Name: CtrInit,
Expand Down
238 changes: 122 additions & 116 deletions api/v1alpha1/step_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1alpha1

import (
"encoding/json"
"fmt"
"testing"
"time"

Expand All @@ -13,131 +14,136 @@ import (
)

func TestStep_GetPodSpec(t *testing.T) {
env := []corev1.EnvVar{
{Name: "ARGO_DATAFLOW_CLUSTER_NAME", Value: "my-cluster"},
{Name: "ARGO_DATAFLOW_DEBUG", Value: "false"},
{Name: "ARGO_DATAFLOW_NAMESPACE", Value: "my-ns"},
{Name: "ARGO_DATAFLOW_PIPELINE_NAME", Value: "my-pl"},
{Name: "ARGO_DATAFLOW_REPLICA", Value: "1"},
{Name: "ARGO_DATAFLOW_STEP", Value: `{"metadata":{"creationTimestamp":null},"spec":{"name":"main","cat":{},"scale":{},"sidecar":{"resources":{}}},"status":{"phase":"","replicas":0,"lastScaledAt":null}}`},
{Name: "ARGO_DATAFLOW_UPDATE_INTERVAL", Value: "1m0s"},
{Name: "GODEBUG"},
}
mounts := []corev1.VolumeMount{{Name: "var-run-argo-dataflow", MountPath: "/var/run/argo-dataflow"}}
dropAll := &corev1.SecurityContext{
Capabilities: &corev1.Capabilities{
Drop: []corev1.Capability{"all"},
},
AllowPrivilegeEscalation: pointer.BoolPtr(false),
}
tests := []struct {
name string
step Step
req GetPodSpecReq
want corev1.PodSpec
}{
{
"Cat",
Step{
Spec: StepSpec{
Name: "main",
Cat: &Cat{},
for replica, priorityClassName := range map[int]string{0: "lead-replica", 1: ""} {
t.Run(fmt.Sprintf("Replica%d", replica), func(t *testing.T) {
env := []corev1.EnvVar{
{Name: "ARGO_DATAFLOW_CLUSTER_NAME", Value: "my-cluster"},
{Name: "ARGO_DATAFLOW_DEBUG", Value: "false"},
{Name: "ARGO_DATAFLOW_NAMESPACE", Value: "my-ns"},
{Name: "ARGO_DATAFLOW_PIPELINE_NAME", Value: "my-pl"},
{Name: "ARGO_DATAFLOW_REPLICA", Value: fmt.Sprintf("%d", replica)},
{Name: "ARGO_DATAFLOW_STEP", Value: `{"metadata":{"creationTimestamp":null},"spec":{"name":"main","cat":{},"scale":{},"sidecar":{"resources":{}}},"status":{"phase":"","replicas":0,"lastScaledAt":null}}`},
{Name: "ARGO_DATAFLOW_UPDATE_INTERVAL", Value: "1m0s"},
{Name: "GODEBUG"},
}
mounts := []corev1.VolumeMount{{Name: "var-run-argo-dataflow", MountPath: "/var/run/argo-dataflow"}}
dropAll := &corev1.SecurityContext{
Capabilities: &corev1.Capabilities{
Drop: []corev1.Capability{"all"},
},
},
GetPodSpecReq{
ClusterName: "my-cluster",
ImageFormat: "image-%s",
Namespace: "my-ns",
PipelineName: "my-pl",
Replica: 1,
RunnerImage: "my-runner",
PullPolicy: corev1.PullAlways,
StepStatus: StepStatus{Phase: StepRunning},
UpdateInterval: time.Minute,
Sidecar: Sidecar{Resources: standardResources},
},
corev1.PodSpec{
Containers: []corev1.Container{
{
Args: []string{"sidecar"},
Env: env,
Image: "my-runner",
ImagePullPolicy: corev1.PullAlways,
Name: "sidecar",
Lifecycle: &corev1.Lifecycle{PreStop: &corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/pre-stop?source=kubernetes",
Port: intstr.FromInt(3570),
Scheme: "HTTPS",
},
}},
Ports: []corev1.ContainerPort{{ContainerPort: 3570}},
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{Path: "/ready", Port: intstr.FromInt(3570), Scheme: "HTTPS"},
},
AllowPrivilegeEscalation: pointer.BoolPtr(false),
}
tests := []struct {
name string
step Step
req GetPodSpecReq
want corev1.PodSpec
}{
{
"Cat",
Step{
Spec: StepSpec{
Name: "main",
Cat: &Cat{},
},
Resources: standardResources,
SecurityContext: dropAll,
VolumeMounts: mounts,
},
{
Args: []string{"cat"},
Image: "my-runner",
ImagePullPolicy: corev1.PullAlways,
Name: "main",
Lifecycle: &corev1.Lifecycle{PreStop: &corev1.Handler{
Exec: &corev1.ExecAction{Command: []string{"/var/run/argo-dataflow/prestop"}},
}},
Resources: standardResources,
SecurityContext: dropAll,
VolumeMounts: mounts,
GetPodSpecReq{
ClusterName: "my-cluster",
ImageFormat: "image-%s",
Namespace: "my-ns",
PipelineName: "my-pl",
Replica: int32(replica),
RunnerImage: "my-runner",
PullPolicy: corev1.PullAlways,
StepStatus: StepStatus{Phase: StepRunning},
UpdateInterval: time.Minute,
Sidecar: Sidecar{Resources: standardResources},
},
},
InitContainers: []corev1.Container{
{
Args: []string{"init"},
Env: env,
Image: "my-runner",
ImagePullPolicy: corev1.PullAlways,
Name: "init",
Resources: standardResources,
SecurityContext: dropAll,
VolumeMounts: append(mounts, corev1.VolumeMount{
Name: "ssh",
ReadOnly: true,
MountPath: "/.ssh",
}),
},
},
SecurityContext: &corev1.PodSecurityContext{
RunAsUser: pointer.Int64Ptr(9653),
RunAsNonRoot: pointer.BoolPtr(true),
},
Volumes: []corev1.Volume{
{
Name: "var-run-argo-dataflow",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
corev1.PodSpec{
Containers: []corev1.Container{
{
Args: []string{"sidecar"},
Env: env,
Image: "my-runner",
ImagePullPolicy: corev1.PullAlways,
Name: "sidecar",
Lifecycle: &corev1.Lifecycle{PreStop: &corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Path: "/pre-stop?source=kubernetes",
Port: intstr.FromInt(3570),
Scheme: "HTTPS",
},
}},
Ports: []corev1.ContainerPort{{ContainerPort: 3570}},
ReadinessProbe: &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{Path: "/ready", Port: intstr.FromInt(3570), Scheme: "HTTPS"},
},
},
Resources: standardResources,
SecurityContext: dropAll,
VolumeMounts: mounts,
},
{
Args: []string{"cat"},
Image: "my-runner",
ImagePullPolicy: corev1.PullAlways,
Name: "main",
Lifecycle: &corev1.Lifecycle{PreStop: &corev1.Handler{
Exec: &corev1.ExecAction{Command: []string{"/var/run/argo-dataflow/prestop"}},
}},
Resources: standardResources,
SecurityContext: dropAll,
VolumeMounts: mounts,
},
},
}, {
Name: "ssh",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: "ssh",
DefaultMode: pointer.Int32Ptr(0o644),
InitContainers: []corev1.Container{
{
Args: []string{"init"},
Env: env,
Image: "my-runner",
ImagePullPolicy: corev1.PullAlways,
Name: "init",
Resources: standardResources,
SecurityContext: dropAll,
VolumeMounts: append(mounts, corev1.VolumeMount{
Name: "ssh",
ReadOnly: true,
MountPath: "/.ssh",
}),
},
},
PriorityClassName: priorityClassName,
SecurityContext: &corev1.PodSecurityContext{
RunAsUser: pointer.Int64Ptr(9653),
RunAsNonRoot: pointer.BoolPtr(true),
},
Volumes: []corev1.Volume{
{
Name: "var-run-argo-dataflow",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
}, {
Name: "ssh",
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: "ssh",
DefaultMode: pointer.Int32Ptr(0o644),
},
},
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _ := json.MarshalIndent(tt.step.GetPodSpec(tt.req), "", " ")
want, _ := json.MarshalIndent(tt.want, "", " ")
assert.Equal(t, string(want), string(got))
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, _ := json.MarshalIndent(tt.step.GetPodSpec(tt.req), "", " ")
want, _ := json.MarshalIndent(tt.want, "", " ")
assert.Equal(t, string(want), string(got))
})
}
})
}
}
8 changes: 8 additions & 0 deletions config/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13606,6 +13606,14 @@ spec:
selector:
app: testapi
---
apiVersion: scheduling.k8s.io/v1
description: This priority class is used to ensure that lead replicas are prioritized
over other replicas.
kind: PriorityClass
metadata:
name: lead-replica
value: 1
---
apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down
8 changes: 8 additions & 0 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13590,6 +13590,14 @@ spec:
selector:
control-plane: controller-manager
---
apiVersion: scheduling.k8s.io/v1
description: This priority class is used to ensure that lead replicas are prioritized
over other replicas.
kind: PriorityClass
metadata:
name: lead-replica
value: 1
---
apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down
1 change: 1 addition & 0 deletions config/default/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ bases:

resources:
- ssh-configmap.yaml
- lead-replica-priorityclass.yaml

patchesStrategicMerge:
# Protect the /metrics endpoint by putting it behind auth.
Expand Down
6 changes: 6 additions & 0 deletions config/default/lead-replica-priorityclass.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: lead-replica
value: 1
description: "This priority class is used to ensure that lead replicas are prioritized over other replicas."
8 changes: 8 additions & 0 deletions config/dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13606,6 +13606,14 @@ spec:
selector:
app: testapi
---
apiVersion: scheduling.k8s.io/v1
description: This priority class is used to ensure that lead replicas are prioritized
over other replicas.
kind: PriorityClass
metadata:
name: lead-replica
value: 1
---
apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down
8 changes: 8 additions & 0 deletions config/quick-start.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13590,6 +13590,14 @@ spec:
selector:
control-plane: controller-manager
---
apiVersion: scheduling.k8s.io/v1
description: This priority class is used to ensure that lead replicas are prioritized
over other replicas.
kind: PriorityClass
metadata:
name: lead-replica
value: 1
---
apiVersion: apps/v1
kind: Deployment
metadata:
Expand Down

0 comments on commit be2b63d

Please sign in to comment.