From 70145ac6f478643bee14c3ff59e09404f3bfdcf1 Mon Sep 17 00:00:00 2001 From: Ansu Varghese Date: Mon, 10 May 2021 17:16:39 -0400 Subject: [PATCH] Lister Signed-off-by: Ansu Varghese --- pkg/common/scheduler/statefulset/scheduler.go | 9 +++++---- .../scheduler/statefulset/scheduler_test.go | 20 +++++++++++-------- .../scheduler/statefulset/state_test.go | 8 ++++---- .../pkg/reconciler/testing/v1/listers.go | 4 ++++ 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/pkg/common/scheduler/statefulset/scheduler.go b/pkg/common/scheduler/statefulset/scheduler.go index ec8c176190..4ff7299867 100644 --- a/pkg/common/scheduler/statefulset/scheduler.go +++ b/pkg/common/scheduler/statefulset/scheduler.go @@ -66,10 +66,12 @@ func NewScheduler(ctx context.Context, stateAccessor := newStateBuilder(ctx, lister, capacity, schedulerPolicy, nodeLister) autoscaler := NewAutoscaler(ctx, namespace, name, stateAccessor, refreshPeriod, capacity) + podInformer := podinformer.Get(ctx) + podLister := podInformer.Lister().Pods(namespace) go autoscaler.Start(ctx) - return NewStatefulSetScheduler(ctx, namespace, name, lister, stateAccessor, autoscaler) + return NewStatefulSetScheduler(ctx, namespace, name, lister, stateAccessor, autoscaler, podLister) } // StatefulSetScheduler is a scheduler placing VPod into statefulset-managed set of pods @@ -95,14 +97,13 @@ func NewStatefulSetScheduler(ctx context.Context, namespace, name string, lister scheduler.VPodLister, stateAccessor stateAccessor, - autoscaler Autoscaler) scheduler.Scheduler { + autoscaler Autoscaler, podlister corev1listers.PodNamespaceLister) scheduler.Scheduler { - podInformer := podinformer.Get(ctx) scheduler := &StatefulSetScheduler{ logger: logging.FromContext(ctx), statefulSetName: name, statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(namespace), - podLister: podInformer.Lister().Pods(namespace), + podLister: podlister, vpodLister: lister, pending: make(map[types.NamespacedName]int32), lock: new(sync.Mutex), diff --git a/pkg/common/scheduler/statefulset/scheduler_test.go b/pkg/common/scheduler/statefulset/scheduler_test.go index e9fdce6817..0d68a1afdc 100644 --- a/pkg/common/scheduler/statefulset/scheduler_test.go +++ b/pkg/common/scheduler/statefulset/scheduler_test.go @@ -283,7 +283,7 @@ func TestStatefulsetScheduler(t *testing.T) { }, expected: []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 1}, - {PodName: "statefulset-name-2", ZoneName: "zone1", VReplicas: 1}, + {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 1}, {PodName: "statefulset-name-3", ZoneName: "zone2", VReplicas: 1}, }, schedulerPolicy: EVENSPREAD, @@ -300,7 +300,7 @@ func TestStatefulsetScheduler(t *testing.T) { }, expected: []duckv1alpha1.Placement{ {PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 2}, - {PodName: "statefulset-name-2", ZoneName: "zone1", VReplicas: 2}, + {PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 2}, {PodName: "statefulset-name-3", ZoneName: "zone2", VReplicas: 3}, }, schedulerPolicy: EVENSPREAD, @@ -310,25 +310,28 @@ func TestStatefulsetScheduler(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ctx, _ := setupFakeContext(t) - + nodelist := make([]runtime.Object, 0, numZones) + podlist := make([]runtime.Object, 0, tc.replicas) vpodClient := tscheduler.NewVPodClient() if tc.schedulerPolicy == EVENSPREAD { for i := int32(0); i < numZones; i++ { nodeName := "node" + fmt.Sprint(i) zoneName := "zone" + fmt.Sprint(i) - _, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, makeNode(nodeName, zoneName), metav1.CreateOptions{}) + node, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, makeNode(nodeName, zoneName), metav1.CreateOptions{}) if err != nil { t.Fatal("unexpected error", err) } + nodelist = append(nodelist, node) } for i := int32(0); i < tc.replicas; i++ { nodeName := "node" + fmt.Sprint(i) podName := sfsName + "-" + fmt.Sprint(i) - _, err := kubeclient.Get(ctx).CoreV1().Pods(testNs).Create(ctx, makePod(testNs, podName, nodeName), metav1.CreateOptions{}) + pod, err := kubeclient.Get(ctx).CoreV1().Pods(testNs).Create(ctx, makePod(testNs, podName, nodeName), metav1.CreateOptions{}) if err != nil { t.Fatal("unexpected error", err) } + podlist = append(podlist, pod) } } @@ -336,9 +339,10 @@ func TestStatefulsetScheduler(t *testing.T) { if err != nil { t.Fatal("unexpected error", err) } - ls := listers.NewListers(nil) - sa := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, ls.GetNodeLister()) - s := NewStatefulSetScheduler(ctx, testNs, sfsName, vpodClient.List, sa, nil).(*StatefulSetScheduler) + lsn := listers.NewListers(nodelist) + sa := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, lsn.GetNodeLister()) + lsp := listers.NewListers(podlist) + s := NewStatefulSetScheduler(ctx, testNs, sfsName, vpodClient.List, sa, nil, lsp.GetPodLister().Pods(testNs)).(*StatefulSetScheduler) // Give some time for the informer to notify the scheduler and set the number of replicas time.Sleep(200 * time.Millisecond) diff --git a/pkg/common/scheduler/statefulset/state_test.go b/pkg/common/scheduler/statefulset/state_test.go index 06eff7341f..e6bcdf3610 100644 --- a/pkg/common/scheduler/statefulset/state_test.go +++ b/pkg/common/scheduler/statefulset/state_test.go @@ -23,11 +23,11 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1" tscheduler "knative.dev/eventing-kafka/pkg/common/scheduler/testing" listers "knative.dev/eventing/pkg/reconciler/testing/v1" kubeclient "knative.dev/pkg/client/injection/kube/client" - "k8s.io/apimachinery/pkg/runtime" ) func TestStateBuilder(t *testing.T) { @@ -98,7 +98,7 @@ func TestStateBuilder(t *testing.T) { t.Run(tc.name, func(t *testing.T) { ctx, _ := setupFakeContext(t) vpodClient := tscheduler.NewVPodClient() - correctURI := make([]runtime.Object, 0, len(tc.nodes)) + nodelist := make([]runtime.Object, 0, len(tc.nodes)) for i, placements := range tc.vpods { vpodName := fmt.Sprint("vpod-name-", i) @@ -113,11 +113,11 @@ func TestStateBuilder(t *testing.T) { if err != nil { t.Fatal("unexpected error", err) } - correctURI = append(correctURI, node) + nodelist = append(nodelist, node) } } - ls := listers.NewListers(correctURI) + ls := listers.NewListers(nodelist) stateBuilder := newStateBuilder(ctx, vpodClient.List, int32(10), tc.schedulerPolicy, ls.GetNodeLister()) state, err := stateBuilder.State() if err != nil { diff --git a/vendor/knative.dev/eventing/pkg/reconciler/testing/v1/listers.go b/vendor/knative.dev/eventing/pkg/reconciler/testing/v1/listers.go index 195b32972b..cc6d07494b 100644 --- a/vendor/knative.dev/eventing/pkg/reconciler/testing/v1/listers.go +++ b/vendor/knative.dev/eventing/pkg/reconciler/testing/v1/listers.go @@ -211,3 +211,7 @@ func (l *Listers) GetCustomResourceDefinitionLister() apiextensionsv1listers.Cus func (l *Listers) GetNodeLister() corev1listers.NodeLister { return corev1listers.NewNodeLister(l.indexerFor(&corev1.Node{})) } + +func (l *Listers) GetPodLister() corev1listers.PodLister { + return corev1listers.NewPodLister(l.indexerFor(&corev1.Pod{})) +}