Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
Adding node lister and pod lister to get info from cache to compute s…
Browse files Browse the repository at this point in the history
…pread etc

Signed-off-by: Ansu Varghese <[email protected]>
  • Loading branch information
aavarghese committed May 10, 2021
1 parent a1392ff commit 6d8e1bf
Show file tree
Hide file tree
Showing 13 changed files with 242 additions and 46 deletions.
11 changes: 5 additions & 6 deletions pkg/common/scheduler/statefulset/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
corev1 "k8s.io/client-go/listers/core/v1"
gtesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"

listers "knative.dev/eventing/pkg/reconciler/testing/v1"
kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake"

Expand Down Expand Up @@ -214,8 +213,8 @@ func TestAutoscaler(t *testing.T) {
ctx, _ := setupFakeContext(t)

vpodClient := tscheduler.NewVPodClient()
nodeLister := corev1.NewNodeLister(cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}))
stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, nodeLister)
ls := listers.NewListers(nil)
stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, ls.GetNodeLister())

sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs)
_, err := sfsClient.Create(ctx, makeStatefulset(testNs, sfsName, tc.replicas), metav1.CreateOptions{})
Expand Down Expand Up @@ -258,8 +257,8 @@ func TestAutoscalerScaleDownToZero(t *testing.T) {
})

vpodClient := tscheduler.NewVPodClient()
nodeLister := corev1.NewNodeLister(cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}))
stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, MAXFILLUP, nodeLister)
ls := listers.NewListers(nil)
stateAccessor := newStateBuilder(ctx, vpodClient.List, 10, MAXFILLUP, ls.GetNodeLister())

sfsClient := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs)
_, err := sfsClient.Create(ctx, makeStatefulset(testNs, sfsName, 10), metav1.CreateOptions{})
Expand Down
34 changes: 21 additions & 13 deletions pkg/common/scheduler/statefulset/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ import (

"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
corev1 "k8s.io/client-go/listers/core/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/integer"

Expand All @@ -41,6 +39,7 @@ import (

duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1"
"knative.dev/eventing-kafka/pkg/common/scheduler"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
)

type SchedulerPolicyType string
Expand All @@ -63,22 +62,24 @@ func NewScheduler(ctx context.Context,
refreshPeriod time.Duration,
capacity int32,
schedulerPolicy SchedulerPolicyType,
nodeLister corev1.NodeLister) scheduler.Scheduler {
nodeLister corev1listers.NodeLister) scheduler.Scheduler {

stateAccessor := newStateBuilder(ctx, lister, capacity, schedulerPolicy, nodeLister)
autoscaler := NewAutoscaler(ctx, namespace, name, stateAccessor, refreshPeriod, capacity)
podInformer := podinformer.Get(ctx)
podLister := podInformer.Lister().Pods(namespace)

go autoscaler.Start(ctx)

return NewStatefulSetScheduler(ctx, namespace, name, lister, stateAccessor, autoscaler)
return NewStatefulSetScheduler(ctx, namespace, name, lister, stateAccessor, autoscaler, podLister)
}

// StatefulSetScheduler is a scheduler placing VPod into statefulset-managed set of pods
type StatefulSetScheduler struct {
logger *zap.SugaredLogger
statefulSetName string
statefulSetClient clientappsv1.StatefulSetInterface
podClient clientcorev1.PodInterface
podLister corev1listers.PodNamespaceLister
vpodLister scheduler.VPodLister
lock sync.Locker
stateAccessor stateAccessor
Expand All @@ -96,13 +97,13 @@ func NewStatefulSetScheduler(ctx context.Context,
namespace, name string,
lister scheduler.VPodLister,
stateAccessor stateAccessor,
autoscaler Autoscaler) scheduler.Scheduler {
autoscaler Autoscaler, podlister corev1listers.PodNamespaceLister) scheduler.Scheduler {

scheduler := &StatefulSetScheduler{
logger: logging.FromContext(ctx),
statefulSetName: name,
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(namespace),
podClient: kubeclient.Get(ctx).CoreV1().Pods(namespace),
podLister: podlister,
vpodLister: lister,
pending: make(map[types.NamespacedName]int32),
lock: new(sync.Mutex),
Expand Down Expand Up @@ -177,7 +178,7 @@ func (s *StatefulSetScheduler) Schedule(vpod scheduler.VPod) ([]duckv1alpha1.Pla
// Need more => scale up
logger.Infow("scaling up", zap.Int32("vreplicas", tr), zap.Int32("new vreplicas", vpod.GetVReplicas()))
if state.schedulerPolicy == EVENSPREAD {
//spreadVal is the minimum number of replicas to be placed in each zone for high availability
//spreadVal is the maximum number of replicas to be placed in each zone for high availability
spreadVal = int32(math.Ceil(float64(vpod.GetVReplicas()) / float64(state.numZones)))
logger.Infow("number of replicas per zone", zap.Int32("spreadVal", spreadVal))
placements, left = s.addReplicasEvenSpread(state, vpod.GetVReplicas()-tr, placements, spreadVal)
Expand Down Expand Up @@ -236,13 +237,13 @@ func (s *StatefulSetScheduler) removeReplicasEvenSpread(diff int32, placements [
logger.Info(zap.String("zoneName", zoneNames[i]), zap.Int32("totalInZone", totalInZone))

placementOrdinals := placementsByZone[zoneNames[i]]
for j := 0; j < len(placementOrdinals); j++ { //iterating through all existing pods belonging to a single zone
for j := len(placementOrdinals) - 1; j >= 0; j-- { //iterating through all existing pods belonging to a single zone from larger cardinal to smaller
ordinal := placementOrdinals[j]
placement := s.getPlacementFromPodOrdinal(placements, ordinal)

if diff > 0 && totalInZone >= evenSpread {
deallocation := integer.Int32Min(diff, integer.Int32Min(placement.VReplicas, totalInZone-evenSpread))
logger.Info(zap.Int32("diff", diff), zap.Int32("deallocation", deallocation))
logger.Info(zap.Int32("diff", diff), zap.Int32("ordinal", ordinal), zap.Int32("deallocation", deallocation))

if deallocation > 0 && deallocation < placement.VReplicas {
newPlacements = append(newPlacements, duckv1alpha1.Placement{
Expand All @@ -252,16 +253,23 @@ func (s *StatefulSetScheduler) removeReplicasEvenSpread(diff int32, placements [
})
diff -= deallocation
totalInZone -= deallocation
} else {
} else if deallocation >= placement.VReplicas {
diff -= placement.VReplicas
totalInZone -= placement.VReplicas
} else {
newPlacements = append(newPlacements, duckv1alpha1.Placement{
PodName: placement.PodName,
ZoneName: placement.ZoneName,
VReplicas: placement.VReplicas,
})
}
} else {
newPlacements = append(newPlacements, duckv1alpha1.Placement{
PodName: placement.PodName,
ZoneName: placement.ZoneName,
VReplicas: placement.VReplicas,
})

}
}
}
Expand Down Expand Up @@ -401,7 +409,7 @@ func (s *StatefulSetScheduler) addReplicasEvenSpread(state *state, diff int32, p
}

func (s *StatefulSetScheduler) getZoneNameFromPod(state *state, podName string) (zoneName string, err error) {
pod, err := s.podClient.Get(context.Background(), podName, metav1.GetOptions{})
pod, err := s.podLister.Get(podName)
if err != nil {
return zoneName, err
}
Expand Down
24 changes: 13 additions & 11 deletions pkg/common/scheduler/statefulset/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
corelister "k8s.io/client-go/listers/core/v1"
gtesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"

kubeclient "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake"
Expand All @@ -41,6 +39,7 @@ import (
duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1"
"knative.dev/eventing-kafka/pkg/common/scheduler"
tscheduler "knative.dev/eventing-kafka/pkg/common/scheduler/testing"
listers "knative.dev/eventing/pkg/reconciler/testing/v1"
)

const (
Expand Down Expand Up @@ -284,7 +283,7 @@ func TestStatefulsetScheduler(t *testing.T) {
},
expected: []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 1},
{PodName: "statefulset-name-2", ZoneName: "zone1", VReplicas: 1},
{PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 1},
{PodName: "statefulset-name-3", ZoneName: "zone2", VReplicas: 1},
},
schedulerPolicy: EVENSPREAD,
Expand All @@ -301,7 +300,7 @@ func TestStatefulsetScheduler(t *testing.T) {
},
expected: []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 2},
{PodName: "statefulset-name-2", ZoneName: "zone1", VReplicas: 2},
{PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 2},
{PodName: "statefulset-name-3", ZoneName: "zone2", VReplicas: 3},
},
schedulerPolicy: EVENSPREAD,
Expand All @@ -311,36 +310,39 @@ func TestStatefulsetScheduler(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx, _ := setupFakeContext(t)

nodelist := make([]runtime.Object, 0, numZones)
podlist := make([]runtime.Object, 0, tc.replicas)
vpodClient := tscheduler.NewVPodClient()

if tc.schedulerPolicy == EVENSPREAD {
for i := int32(0); i < numZones; i++ {
nodeName := "node" + fmt.Sprint(i)
zoneName := "zone" + fmt.Sprint(i)
_, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, makeNode(nodeName, zoneName), metav1.CreateOptions{})
node, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, makeNode(nodeName, zoneName), metav1.CreateOptions{})
if err != nil {
t.Fatal("unexpected error", err)
}
nodelist = append(nodelist, node)
}
for i := int32(0); i < tc.replicas; i++ {
nodeName := "node" + fmt.Sprint(i)
podName := sfsName + "-" + fmt.Sprint(i)
_, err := kubeclient.Get(ctx).CoreV1().Pods(testNs).Create(ctx, makePod(testNs, podName, nodeName), metav1.CreateOptions{})
pod, err := kubeclient.Get(ctx).CoreV1().Pods(testNs).Create(ctx, makePod(testNs, podName, nodeName), metav1.CreateOptions{})
if err != nil {
t.Fatal("unexpected error", err)
}
podlist = append(podlist, pod)
}
}

_, err := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs).Create(ctx, makeStatefulset(testNs, sfsName, tc.replicas), metav1.CreateOptions{})
if err != nil {
t.Fatal("unexpected error", err)
}

nodeLister := corelister.NewNodeLister(cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}))
sa := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, nodeLister)
s := NewStatefulSetScheduler(ctx, testNs, sfsName, vpodClient.List, sa, nil).(*StatefulSetScheduler)
lsn := listers.NewListers(nodelist)
sa := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, lsn.GetNodeLister())
lsp := listers.NewListers(podlist)
s := NewStatefulSetScheduler(ctx, testNs, sfsName, vpodClient.List, sa, nil, lsp.GetPodLister().Pods(testNs)).(*StatefulSetScheduler)

// Give some time for the informer to notify the scheduler and set the number of replicas
time.Sleep(200 * time.Millisecond)
Expand Down
12 changes: 5 additions & 7 deletions pkg/common/scheduler/statefulset/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ import (
"context"

"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
corev1 "k8s.io/client-go/listers/core/v1"
"knative.dev/eventing-kafka/pkg/common/scheduler"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"
)

Expand Down Expand Up @@ -136,16 +135,15 @@ func (s *stateBuilder) State() (*state, error) {

if s.schedulerPolicy == EVENSPREAD {
//TODO: need a node watch to see if # nodes/ # zones have gone up or down
nodes, err := kubeclient.Get(s.ctx).CoreV1().Nodes().List(s.ctx, metav1.ListOptions{})
// nodes, err := s.nodeLister.List(labels.Everything()) // Not working yet!!
nodes, err := s.nodeLister.List(labels.Everything())
if err != nil {
return nil, err
}

nodeToZoneMap := make(map[string]string, len(nodes.Items))
nodeToZoneMap := make(map[string]string, len(nodes))
zoneMap := make(map[string]struct{})
for i := 0; i < len(nodes.Items); i++ {
node := nodes.Items[i]
for i := 0; i < len(nodes); i++ {
node := nodes[i]
zoneName, ok := node.GetLabels()[ZoneLabel]
if !ok {
continue //ignore node that doesn't have zone info (maybe a test setup or control node)
Expand Down
12 changes: 7 additions & 5 deletions pkg/common/scheduler/statefulset/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/apimachinery/pkg/runtime"
duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1"
tscheduler "knative.dev/eventing-kafka/pkg/common/scheduler/testing"
listers "knative.dev/eventing/pkg/reconciler/testing/v1"
kubeclient "knative.dev/pkg/client/injection/kube/client"
)

Expand Down Expand Up @@ -98,6 +98,7 @@ func TestStateBuilder(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctx, _ := setupFakeContext(t)
vpodClient := tscheduler.NewVPodClient()
nodelist := make([]runtime.Object, 0, len(tc.nodes))

for i, placements := range tc.vpods {
vpodName := fmt.Sprint("vpod-name-", i)
Expand All @@ -108,15 +109,16 @@ func TestStateBuilder(t *testing.T) {

if tc.schedulerPolicy == EVENSPREAD {
for i := 0; i < len(tc.nodes); i++ {
_, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, tc.nodes[i], metav1.CreateOptions{})
node, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, tc.nodes[i], metav1.CreateOptions{})
if err != nil {
t.Fatal("unexpected error", err)
}
nodelist = append(nodelist, node)
}
}

nodeLister := corev1.NewNodeLister(cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}))
stateBuilder := newStateBuilder(ctx, vpodClient.List, int32(10), tc.schedulerPolicy, nodeLister)
ls := listers.NewListers(nodelist)
stateBuilder := newStateBuilder(ctx, vpodClient.List, int32(10), tc.schedulerPolicy, ls.GetNodeLister())
state, err := stateBuilder.State()
if err != nil {
t.Fatal("unexpected error", err)
Expand Down
7 changes: 3 additions & 4 deletions pkg/source/reconciler/mtsource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"time"

"github.com/kelseyhightower/envconfig"
corev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand All @@ -37,6 +35,7 @@ import (
kafkainformer "knative.dev/eventing-kafka/pkg/client/injection/informers/sources/v1beta1/kafkasource"
"knative.dev/eventing-kafka/pkg/client/injection/reconciler/sources/v1beta1/kafkasource"
scheduler "knative.dev/eventing-kafka/pkg/common/scheduler/statefulset"
nodeinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/node"
)

type envConfig struct {
Expand All @@ -57,6 +56,7 @@ func NewController(
}

kafkaInformer := kafkainformer.Get(ctx)
nodeInformer := nodeinformer.Get(ctx)

c := &Reconciler{
KubeClientSet: kubeclient.Get(ctx),
Expand All @@ -73,8 +73,7 @@ func NewController(
sourcesv1beta1.RegisterAlternateKafkaConditionSet(sourcesv1beta1.KafkaMTSourceCondSet)

rp := time.Duration(env.SchedulerRefreshPeriod) * time.Second
nodeLister := corev1.NewNodeLister(cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}))
c.scheduler = scheduler.NewScheduler(ctx, system.Namespace(), mtadapterName, c.vpodLister, rp, env.PodCapacity, env.SchedulerPolicy, nodeLister)
c.scheduler = scheduler.NewScheduler(ctx, system.Namespace(), mtadapterName, c.vpodLister, rp, env.PodCapacity, env.SchedulerPolicy, nodeInformer.Lister())

logging.FromContext(ctx).Info("Setting up kafka event handlers")
kafkaInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
Expand Down
27 changes: 27 additions & 0 deletions third_party/VENDOR-LICENSE/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
Copyright (c) 2009 The Go Authors. All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Loading

0 comments on commit 6d8e1bf

Please sign in to comment.