Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Commit

Permalink
Support for PodGroup updates (#207)
Browse files Browse the repository at this point in the history
* Support for PodGroup updates

Signed-off-by: Yuki Iwai <[email protected]>

* Add the logic to check PodGroup changes

Signed-off-by: Yuki Iwai <[email protected]>

* make podgroup pointer

Signed-off-by: Yuki Iwai <[email protected]>

Signed-off-by: Yuki Iwai <[email protected]>
  • Loading branch information
tenzen-y authored Jan 20, 2023
1 parent 4d97a27 commit 9309adf
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 23 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/go-logr/logr v1.2.3
github.com/google/go-cmp v0.5.8
github.com/prometheus/client_golang v1.12.2
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.8.0
Expand Down Expand Up @@ -37,7 +38,6 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/imdario/mergo v0.3.12 // indirect
Expand Down
43 changes: 27 additions & 16 deletions pkg/controller.v1/common/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,52 @@ import (

apiv1 "github.com/kubeflow/common/pkg/apis/common/v1"

"github.com/google/go-cmp/cmp"
log "github.com/sirupsen/logrus"
policyapi "k8s.io/api/policy/v1beta1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type FillPodGroupSpecFunc func(object metav1.Object) error

func (jc *JobController) SyncPodGroup(job metav1.Object, specFunc FillPodGroupSpecFunc) (metav1.Object, error) {
pgctl := jc.PodGroupControl

// Check whether podGroup exists or not
podGroup, err := pgctl.GetPodGroup(job.GetNamespace(), job.GetName())
if err == nil {
// update podGroup for gang scheduling
oldPodGroup := &podGroup
if err = specFunc(podGroup); err != nil {
return nil, fmt.Errorf("unable to fill the spec of PodGroup, '%v': %v", klog.KObj(podGroup), err)
}
if diff := cmp.Diff(oldPodGroup, podGroup); len(diff) != 0 {
return podGroup, pgctl.UpdatePodGroup(podGroup.(client.Object))
}
return podGroup, nil
} else if client.IgnoreNotFound(err) != nil {
return nil, fmt.Errorf("unable to get PodGroup: %v", err)
}

// create podGroup for gang scheduling
toCreatePodGroup := pgctl.NewEmptyPodGroup()
toCreatePodGroup.SetName(job.GetName())
toCreatePodGroup.SetNamespace(job.GetNamespace())
toCreatePodGroup.SetAnnotations(job.GetAnnotations())
toCreatePodGroup.SetOwnerReferences([]metav1.OwnerReference{*jc.GenOwnerReference(job)})
if err = specFunc(toCreatePodGroup); err != nil {
return nil, fmt.Errorf("unable to fill the spec of PodGroup: %v", err)
}
return nil, fmt.Errorf("unable to get a PodGroup: %v", err)
} else {
// create podGroup for gang scheduling
newPodGroup := pgctl.NewEmptyPodGroup()
newPodGroup.SetName(job.GetName())
newPodGroup.SetNamespace(job.GetNamespace())
newPodGroup.SetAnnotations(job.GetAnnotations())
newPodGroup.SetOwnerReferences([]metav1.OwnerReference{*jc.GenOwnerReference(job)})
if err = specFunc(newPodGroup); err != nil {
return nil, fmt.Errorf("unable to fill the spec of PodGroup, '%v': %v", klog.KObj(newPodGroup), err)
}

err = pgctl.CreatePodGroup(toCreatePodGroup)
if err != nil {
return podGroup, fmt.Errorf("unable to create PodGroup: %v", err)
err = pgctl.CreatePodGroup(newPodGroup)
if err != nil {
return podGroup, fmt.Errorf("unable to create PodGroup: %v", err)
}
createdPodGroupsCount.Inc()
}
createdPodGroupsCount.Inc()

createdPodGroup, err := pgctl.GetPodGroup(job.GetNamespace(), job.GetName())
if err != nil {
Expand Down
37 changes: 31 additions & 6 deletions pkg/controller.v1/control/podgroup_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
schedulerpluginsv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
volcanobatchv1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1"
Expand All @@ -33,20 +34,22 @@ import (
// PodGroupControlInterface is an interface that knows how to add or delete PodGroups
// created as an interface to allow testing.
type PodGroupControlInterface interface {
// NewEmptyPodGroup returns an empty PodGroup
// NewEmptyPodGroup returns an empty PodGroup.
NewEmptyPodGroup() client.Object
// GetPodGroup gets the PodGroup identified by namespace and name
// GetPodGroup gets the PodGroup identified by namespace and name.
GetPodGroup(namespace string, name string) (metav1.Object, error)
// DeletePodGroup deletes the PodGroup identified by namespace and name.
DeletePodGroup(namespace string, name string) error
// UpdatePodGroup updates a PodGroup.
UpdatePodGroup(podGroup client.Object) error
// CreatePodGroup creates a new PodGroup with PodGroup spec fill function.
CreatePodGroup(podGroup client.Object) error
// DelayPodCreationDueToPodGroup determines whether it should delay Pod Creation.
DelayPodCreationDueToPodGroup(pg metav1.Object) bool
// DecoratePodTemplateSpec decorates PodTemplateSpec.
// If the PodTemplateSpec has SchedulerName set, this method will Not override
// If the PodTemplateSpec has SchedulerName set, this method will Not override.
DecoratePodTemplateSpec(pts *corev1.PodTemplateSpec, job metav1.Object, rtype string)
// GetSchedulerName returns the name of the gang scheduler
// GetSchedulerName returns the name of the gang scheduler.
GetSchedulerName() string
}

Expand Down Expand Up @@ -99,6 +102,15 @@ func (v *VolcanoControl) DeletePodGroup(namespace string, name string) error {
return v.Client.SchedulingV1beta1().PodGroups(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
}

func (v *VolcanoControl) UpdatePodGroup(podGroup client.Object) error {
pg := podGroup.(*volcanov1beta1.PodGroup)
_, err := v.Client.SchedulingV1beta1().PodGroups(pg.GetNamespace()).Update(context.TODO(), pg, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("unable to update a PodGroup, '%v': %v", klog.KObj(pg), err)
}
return nil
}

func (v *VolcanoControl) CreatePodGroup(podGroup client.Object) error {
pg := podGroup.(*volcanov1beta1.PodGroup)
createPodGroup, err := v.Client.SchedulingV1beta1().PodGroups(pg.GetNamespace()).Create(context.TODO(), pg, metav1.CreateOptions{})
Expand Down Expand Up @@ -162,9 +174,22 @@ func (s *SchedulerPluginsControl) DeletePodGroup(namespace, name string) error {
return s.Client.Delete(ctx, pg)
}

func (s *SchedulerPluginsControl) UpdatePodGroup(podGroup client.Object) error {
pg := podGroup.(*schedulerpluginsv1alpha1.PodGroup)
err := s.Client.Update(context.TODO(), pg, &client.UpdateOptions{})
if err != nil {
return fmt.Errorf("unable to update a PodGroup, '%v': %v", klog.KObj(pg), err)
}
return nil
}

func (s *SchedulerPluginsControl) CreatePodGroup(podGroup client.Object) error {
ctx := context.TODO()
return s.Client.Create(ctx, podGroup)
pg := podGroup.(*schedulerpluginsv1alpha1.PodGroup)
err := s.Client.Create(context.TODO(), pg, &client.CreateOptions{})
if err != nil {
return fmt.Errorf("unable to create a PodGroup, '%v': %v", klog.KObj(pg), err)
}
return nil
}

var _ PodGroupControlInterface = &SchedulerPluginsControl{}

0 comments on commit 9309adf

Please sign in to comment.