Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
[scheduler, mtsource] Initial HA support for MT Kafka source (#587)
Browse files Browse the repository at this point in the history
* Even spread for HA implementation

Signed-off-by: Ansu Varghese <[email protected]>

* Ignore nodes that don't have a zone label in state

Signed-off-by: Ansu Varghese <[email protected]>

* Fix for not removing enough replicas

Signed-off-by: Ansu Varghese <[email protected]>

* Changing SchedulerPolicyType from int to string

Signed-off-by: Ansu Varghese <[email protected]>

* Adding node lister and pod lister to get info from cache to compute spread etc

Signed-off-by: Ansu Varghese <[email protected]>

* Reverting scheduler policy type to default strategy

Signed-off-by: Ansu Varghese <[email protected]>
  • Loading branch information
aavarghese authored May 12, 2021
1 parent b5dc86f commit 93bdb1c
Show file tree
Hide file tree
Showing 14 changed files with 719 additions and 67 deletions.
1 change: 0 additions & 1 deletion config/source/multi/500-controller-service.yaml

This file was deleted.

15 changes: 15 additions & 0 deletions config/source/multi/deployments/adapter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,18 @@ spec:
containerPort: 8008

terminationGracePeriodSeconds: 10
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- podAffinityTerm:
labelSelector:
matchLabels:
control-plane: kafkasource-mt-adapter
topologyKey: kubernetes.io/hostname
weight: 50
- podAffinityTerm:
labelSelector:
matchLabels:
control-plane: kafkasource-mt-adapter
topologyKey: topology.kubernetes.io/zone
weight: 50
6 changes: 5 additions & 1 deletion config/source/multi/deployments/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ spec:

# How often (in seconds) the autoscaler tries to scale down the statefulset.
- name: AUTOSCALER_REFRESH_PERIOD
value: '10'
value: '100'

# The number of virtual replicas this pod can handle.
- name: POD_CAPACITY
value: '100'

# The scheduling policy type for placing vreplicas on pods (see type SchedulerPolicyType for enum list)
- name: SCHEDULER_POLICY_TYPE
value: 'MAXFILLUP'

resources:
requests:
cpu: 20m
Expand Down
1 change: 1 addition & 0 deletions config/source/multi/roles/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ rules:
- events
- configmaps
- secrets
- nodes
verbs: *everything

# let the webhook label the appropriate namespace
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/duck/v1alpha1/placement_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ type Placement struct {
// PodName is the name of the pod where the resource is placed
PodName string `json:"podName,omitempty"`

// ZoneName is the name of the zone where the pod is located
ZoneName string `json:"zoneName,omitempty"`

// VReplicas is the number of virtual replicas assigned to in the pod
VReplicas int32 `json:"vreplicas,omitempty"`
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/common/scheduler/statefulset/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ func (a *autoscaler) doautoscale(ctx context.Context, attemptScaleDown bool, pen
// The number of replicas may be lower than the last ordinal, for instance
// when the statefulset is manually scaled down. In that case, replicas above
// scale.Spec.Replicas have not been considered when scheduling vreplicas.
// Adjust accordingly
pending -= state.freeCapacity()
// Adjust accordingly (applicable only for MAXFILLUP scheduling policy and not for HA)
if state.schedulerPolicy != EVENSPREAD {
pending -= state.freeCapacity()
}

// Still need more?
if pending > 0 {
Expand Down
49 changes: 38 additions & 11 deletions pkg/common/scheduler/statefulset/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"k8s.io/apimachinery/pkg/runtime"
gtesting "k8s.io/client-go/testing"

listers "knative.dev/eventing/pkg/reconciler/testing/v1"
kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake"
"knative.dev/pkg/logging"

duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1"
"knative.dev/eventing-kafka/pkg/common/scheduler"
Expand All @@ -39,12 +39,13 @@ const (

func TestAutoscaler(t *testing.T) {
testCases := []struct {
name string
replicas int32
vpods []scheduler.VPod
pendings int32
scaleDown bool
wantReplicas int32
name string
replicas int32
vpods []scheduler.VPod
pendings int32
scaleDown bool
wantReplicas int32
schedulerPolicy SchedulerPolicyType
}{
{
name: "no replicas, no placements, no pending",
Expand Down Expand Up @@ -181,17 +182,42 @@ func TestAutoscaler(t *testing.T) {
pendings: int32(8),
wantReplicas: int32(3),
},
{
name: "no replicas, with placements, with pending, enough capacity",
replicas: int32(0),
vpods: []scheduler.VPod{
tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{
{PodName: "pod-0", VReplicas: int32(8)},
{PodName: "pod-1", VReplicas: int32(7)}}),
},
pendings: int32(3),
wantReplicas: int32(3),
schedulerPolicy: EVENSPREAD,
},
{
name: "with replicas, with placements, with pending, enough capacity",
replicas: int32(2),
vpods: []scheduler.VPod{
tscheduler.NewVPod(testNs, "vpod-1", 15, []duckv1alpha1.Placement{
{PodName: "pod-0", VReplicas: int32(8)},
{PodName: "pod-1", VReplicas: int32(7)}}),
},
pendings: int32(3),
wantReplicas: int32(3),
schedulerPolicy: EVENSPREAD,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx, _ := setupFakeContext(t)

vpodClient := tscheduler.NewVPodClient()
stateAccessor := newStateBuilder(logging.FromContext(ctx), vpodClient.List, 10)
ls := listers.NewListers(nil)
stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, ls.GetNodeLister())

sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs)
_, err := sfsClient.Create(ctx, makeStatefulset(ctx, testNs, sfsName, tc.replicas), metav1.CreateOptions{})
_, err := sfsClient.Create(ctx, makeStatefulset(testNs, sfsName, tc.replicas), metav1.CreateOptions{})
if err != nil {
t.Fatal("unexpected error", err)
}
Expand Down Expand Up @@ -231,10 +257,11 @@ func TestAutoscalerScaleDownToZero(t *testing.T) {
})

vpodClient := tscheduler.NewVPodClient()
stateAccessor := newStateBuilder(logging.FromContext(ctx), vpodClient.List, 10)
ls := listers.NewListers(nil)
stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, MAXFILLUP, ls.GetNodeLister())

sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs)
_, err := sfsClient.Create(ctx, makeStatefulset(ctx, testNs, sfsName, 10), metav1.CreateOptions{})
_, err := sfsClient.Create(ctx, makeStatefulset(testNs, sfsName, 10), metav1.CreateOptions{})
if err != nil {
t.Fatal("unexpected error", err)
}
Expand Down
Loading

0 comments on commit 93bdb1c

Please sign in to comment.