Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
Lister
Browse files Browse the repository at this point in the history
Signed-off-by: Ansu Varghese <[email protected]>
  • Loading branch information
aavarghese committed May 10, 2021
1 parent c6bf833 commit 70145ac
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 16 deletions.
9 changes: 5 additions & 4 deletions pkg/common/scheduler/statefulset/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ func NewScheduler(ctx context.Context,

stateAccessor := newStateBuilder(ctx, lister, capacity, schedulerPolicy, nodeLister)
autoscaler := NewAutoscaler(ctx, namespace, name, stateAccessor, refreshPeriod, capacity)
podInformer := podinformer.Get(ctx)
podLister := podInformer.Lister().Pods(namespace)

go autoscaler.Start(ctx)

return NewStatefulSetScheduler(ctx, namespace, name, lister, stateAccessor, autoscaler)
return NewStatefulSetScheduler(ctx, namespace, name, lister, stateAccessor, autoscaler, podLister)
}

// StatefulSetScheduler is a scheduler placing VPod into statefulset-managed set of pods
Expand All @@ -95,14 +97,13 @@ func NewStatefulSetScheduler(ctx context.Context,
namespace, name string,
lister scheduler.VPodLister,
stateAccessor stateAccessor,
autoscaler Autoscaler) scheduler.Scheduler {
autoscaler Autoscaler, podlister corev1listers.PodNamespaceLister) scheduler.Scheduler {

podInformer := podinformer.Get(ctx)
scheduler := &StatefulSetScheduler{
logger: logging.FromContext(ctx),
statefulSetName: name,
statefulSetClient: kubeclient.Get(ctx).AppsV1().StatefulSets(namespace),
podLister: podInformer.Lister().Pods(namespace),
podLister: podlister,
vpodLister: lister,
pending: make(map[types.NamespacedName]int32),
lock: new(sync.Mutex),
Expand Down
20 changes: 12 additions & 8 deletions pkg/common/scheduler/statefulset/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestStatefulsetScheduler(t *testing.T) {
},
expected: []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 1},
{PodName: "statefulset-name-2", ZoneName: "zone1", VReplicas: 1},
{PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 1},
{PodName: "statefulset-name-3", ZoneName: "zone2", VReplicas: 1},
},
schedulerPolicy: EVENSPREAD,
Expand All @@ -300,7 +300,7 @@ func TestStatefulsetScheduler(t *testing.T) {
},
expected: []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", ZoneName: "zone0", VReplicas: 2},
{PodName: "statefulset-name-2", ZoneName: "zone1", VReplicas: 2},
{PodName: "statefulset-name-1", ZoneName: "zone1", VReplicas: 2},
{PodName: "statefulset-name-3", ZoneName: "zone2", VReplicas: 3},
},
schedulerPolicy: EVENSPREAD,
Expand All @@ -310,35 +310,39 @@ func TestStatefulsetScheduler(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx, _ := setupFakeContext(t)

nodelist := make([]runtime.Object, 0, numZones)
podlist := make([]runtime.Object, 0, tc.replicas)
vpodClient := tscheduler.NewVPodClient()

if tc.schedulerPolicy == EVENSPREAD {
for i := int32(0); i < numZones; i++ {
nodeName := "node" + fmt.Sprint(i)
zoneName := "zone" + fmt.Sprint(i)
_, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, makeNode(nodeName, zoneName), metav1.CreateOptions{})
node, err := kubeclient.Get(ctx).CoreV1().Nodes().Create(ctx, makeNode(nodeName, zoneName), metav1.CreateOptions{})
if err != nil {
t.Fatal("unexpected error", err)
}
nodelist = append(nodelist, node)
}
for i := int32(0); i < tc.replicas; i++ {
nodeName := "node" + fmt.Sprint(i)
podName := sfsName + "-" + fmt.Sprint(i)
_, err := kubeclient.Get(ctx).CoreV1().Pods(testNs).Create(ctx, makePod(testNs, podName, nodeName), metav1.CreateOptions{})
pod, err := kubeclient.Get(ctx).CoreV1().Pods(testNs).Create(ctx, makePod(testNs, podName, nodeName), metav1.CreateOptions{})
if err != nil {
t.Fatal("unexpected error", err)
}
podlist = append(podlist, pod)
}
}

_, err := kubeclient.Get(ctx).AppsV1().StatefulSets(testNs).Create(ctx, makeStatefulset(testNs, sfsName, tc.replicas), metav1.CreateOptions{})
if err != nil {
t.Fatal("unexpected error", err)
}
ls := listers.NewListers(nil)
sa := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, ls.GetNodeLister())
s := NewStatefulSetScheduler(ctx, testNs, sfsName, vpodClient.List, sa, nil).(*StatefulSetScheduler)
lsn := listers.NewListers(nodelist)
sa := newStateBuilder(ctx, vpodClient.List, 10, tc.schedulerPolicy, lsn.GetNodeLister())
lsp := listers.NewListers(podlist)
s := NewStatefulSetScheduler(ctx, testNs, sfsName, vpodClient.List, sa, nil, lsp.GetPodLister().Pods(testNs)).(*StatefulSetScheduler)

// Give some time for the informer to notify the scheduler and set the number of replicas
time.Sleep(200 * time.Millisecond)
Expand Down
8 changes: 4 additions & 4 deletions pkg/common/scheduler/statefulset/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import (

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
duckv1alpha1 "knative.dev/eventing-kafka/pkg/apis/duck/v1alpha1"
tscheduler "knative.dev/eventing-kafka/pkg/common/scheduler/testing"
listers "knative.dev/eventing/pkg/reconciler/testing/v1"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"k8s.io/apimachinery/pkg/runtime"
)

func TestStateBuilder(t *testing.T) {
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestStateBuilder(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
ctx, _ := setupFakeContext(t)
vpodClient := tscheduler.NewVPodClient()
correctURI := make([]runtime.Object, 0, len(tc.nodes))
nodelist := make([]runtime.Object, 0, len(tc.nodes))

for i, placements := range tc.vpods {
vpodName := fmt.Sprint("vpod-name-", i)
Expand All @@ -113,11 +113,11 @@ func TestStateBuilder(t *testing.T) {
if err != nil {
t.Fatal("unexpected error", err)
}
correctURI = append(correctURI, node)
nodelist = append(nodelist, node)
}
}

ls := listers.NewListers(correctURI)
ls := listers.NewListers(nodelist)
stateBuilder := newStateBuilder(ctx, vpodClient.List, int32(10), tc.schedulerPolicy, ls.GetNodeLister())
state, err := stateBuilder.State()
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,7 @@ func (l *Listers) GetCustomResourceDefinitionLister() apiextensionsv1listers.Cus
func (l *Listers) GetNodeLister() corev1listers.NodeLister {
return corev1listers.NewNodeLister(l.indexerFor(&corev1.Node{}))
}

func (l *Listers) GetPodLister() corev1listers.PodLister {
return corev1listers.NewPodLister(l.indexerFor(&corev1.Pod{}))
}

0 comments on commit 70145ac

Please sign in to comment.