Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions ray-operator/config/samples/ray-job.volcano-scheduler-queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
apiVersion: scheduling.volcano.sh/v1beta1
kind: Queue
metadata:
name: kuberay-test-queue
spec:
weight: 1
capability:
cpu: 4
memory: 6Gi
---
apiVersion: ray.io/v1
kind: RayJob
metadata:
name: rayjob-sample-0
labels:
ray.io/scheduler-name: volcano
volcano.sh/queue-name: kuberay-test-queue
spec:
entrypoint: python /home/ray/samples/sample_code.py
runtimeEnvYAML: |
pip:
- requests==2.26.0
- pendulum==2.1.2
env_vars:
counter_name: "test_counter"
rayClusterSpec:
rayVersion: '2.46.0'
headGroupSpec:
rayStartParams: {}
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.46.0
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265
name: dashboard
- containerPort: 10001
name: client
resources:
limits:
cpu: "1"
memory: "2Gi"
requests:
cpu: "1"
memory: "2Gi"
volumeMounts:
- mountPath: /home/ray/samples
name: code-sample
volumes:
- name: code-sample
configMap:
name: ray-job-code-sample
items:
- key: sample_code.py
path: sample_code.py
workerGroupSpecs:
- replicas: 2
minReplicas: 2
maxReplicas: 2
groupName: small-group
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray:2.46.0
resources:
limits:
cpu: "1"
memory: "1Gi"
requests:
cpu: "1"
memory: "1Gi"
---
apiVersion: v1
kind: ConfigMap
metadata:
name: ray-job-code-sample
data:
sample_code.py: |
import ray
import os
import requests

ray.init()

@ray.remote
class Counter:
def __init__(self):
# Used to verify runtimeEnv
self.name = os.getenv("counter_name")
assert self.name == "test_counter"
self.counter = 0

def inc(self):
self.counter += 1

def get_counter(self):
return "{} got {}".format(self.name, self.counter)

counter = Counter.remote()

for _ in range(5):
ray.get(counter.inc.remote())
print(ray.get(counter.get_counter.remote()))

# Verify that the correct runtime env was used for the job.
assert requests.__version__ == "2.26.0"
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
volcanov1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1"
volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
volcanobatchv1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1"
volcanoschedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

const (
PodGroupName = "podgroups.scheduling.volcano.sh"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is unused and can be removed i think.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can just remove it

pluginName = "volcano"
QueueNameLabelKey = "volcano.sh/queue-name"
)

Expand All @@ -34,106 +36,189 @@ type VolcanoBatchScheduler struct {

type VolcanoBatchSchedulerFactory struct{}

func GetPluginName() string {
return "volcano"
}
func GetPluginName() string { return pluginName }

func (v *VolcanoBatchScheduler) Name() string {
return GetPluginName()
}

func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, object metav1.Object) error {
app, ok := object.(*rayv1.RayCluster)
if !ok {
return fmt.Errorf("currently only RayCluster is supported, got %T", object)
}
var minMember int32
var totalResource corev1.ResourceList
if !utils.IsAutoscalingEnabled(&app.Spec) {
minMember = utils.CalculateDesiredReplicas(ctx, app) + 1
totalResource = utils.CalculateDesiredResources(app)
} else {
minMember = utils.CalculateMinReplicas(app) + 1
totalResource = utils.CalculateMinResources(app)
switch obj := object.(type) {
case *rayv1.RayCluster:
return v.handleRayCluster(ctx, obj)
case *rayv1.RayJob:
return v.handleRayJob(ctx, obj)
default:
return fmt.Errorf("unsupported object type %T, only RayCluster and RayJob are supported", object)
}
}

// handleRayCluster calculates the PodGroup MinMember and MinResources for a RayCluster
func (v *VolcanoBatchScheduler) handleRayCluster(ctx context.Context, raycluster *rayv1.RayCluster) error {
// Check if this RayCluster is created by a RayJob, if so, skip PodGroup creation
if crdType, ok := raycluster.Labels[utils.RayOriginatedFromCRDLabelKey]; ok && crdType == utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD) {
return nil
}

return v.syncPodGroup(ctx, app, minMember, totalResource)
minMember, totalResource := v.calculatePodGroupParams(ctx, &raycluster.Spec)

return v.syncPodGroup(ctx, raycluster, minMember, totalResource)
}

func getAppPodGroupName(app *rayv1.RayCluster) string {
return fmt.Sprintf("ray-%s-pg", app.Name)
// handleRayJob calculates the PodGroup MinMember and MinResources for a RayJob
func (v *VolcanoBatchScheduler) handleRayJob(ctx context.Context, rayJob *rayv1.RayJob) error {
if rayJob.Spec.RayClusterSpec == nil {
return fmt.Errorf("gang scheduling does not support RayJob %s/%s referencing an existing RayCluster", rayJob.Namespace, rayJob.Name)
}

var totalResourceList []corev1.ResourceList
minMember, totalResource := v.calculatePodGroupParams(ctx, rayJob.Spec.RayClusterSpec)
totalResourceList = append(totalResourceList, totalResource)

// MinMember intentionally excludes the submitter pod to avoid a startup deadlock
// (submitter waits for cluster; gang would wait for submitter). We still add the
// submitter's resource requests into MinResources so capacity is reserved.
if rayJob.Spec.SubmissionMode == rayv1.K8sJobMode {
submitterTemplate := common.GetSubmitterTemplate(&rayJob.Spec, rayJob.Spec.RayClusterSpec)
submitterResource := utils.CalculatePodResource(submitterTemplate.Spec)
totalResourceList = append(totalResourceList, submitterResource)
}

return v.syncPodGroup(ctx, rayJob, minMember, utils.SumResourceList(totalResourceList))
}

func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, app *rayv1.RayCluster, size int32, totalResource corev1.ResourceList) error {
logger := ctrl.LoggerFrom(ctx).WithName(v.Name())
func getAppPodGroupName(object metav1.Object) string {
// Prefer the RayJob name if this object originated from a RayJob
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this? IIUC, if the object is a rayCluster, it won't be originated from a rayjob since we early return in line 60 right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the RayCluster is originated from a RayJob, and at line 226 (Same as populateAnnotations), it tries to retrieve the name of the RayCluster during execution. However, the PodGroup is created using the name of the RayJob.

That’s why getAppPodGroupName needs to check whether the RayCluster is originated from a RayJob.

name := object.GetName()
if labels := object.GetLabels(); labels != nil &&
labels[utils.RayOriginatedFromCRDLabelKey] == utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD) {
if rayJobName := labels[utils.RayOriginatedFromCRNameLabelKey]; rayJobName != "" {
name = rayJobName
}
}
return fmt.Sprintf("ray-%s-pg", name)
}

func addSchedulerName(obj metav1.Object, schedulerName string) {
switch obj := obj.(type) {
case *corev1.Pod:
obj.Spec.SchedulerName = schedulerName
case *corev1.PodTemplateSpec:
obj.Spec.SchedulerName = schedulerName
}
}

func populateAnnotations(parent metav1.Object, child metav1.Object, groupName string) {
annotations := child.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(parent)
annotations[volcanobatchv1alpha1.TaskSpecKey] = groupName
child.SetAnnotations(annotations)
}

podGroupName := getAppPodGroupName(app)
podGroup := volcanov1beta1.PodGroup{}
if err := v.cli.Get(ctx, types.NamespacedName{Namespace: app.Namespace, Name: podGroupName}, &podGroup); err != nil {
func populateLabelsFromObject(parent metav1.Object, child metav1.Object, key string) {
labels := child.GetLabels()
if labels == nil {
labels = make(map[string]string)
}
if parentLabel, exist := parent.GetLabels()[key]; exist && parentLabel != "" {
labels[key] = parentLabel
}
child.SetLabels(labels)
}

// syncPodGroup ensures a Volcano PodGroup exists/updated for the given object
// with the provided size (MinMember) and total resources.
func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, owner metav1.Object, size int32, totalResource corev1.ResourceList) error {
logger := ctrl.LoggerFrom(ctx).WithName(pluginName)

podGroupName := getAppPodGroupName(owner)
podGroup := volcanoschedulingv1beta1.PodGroup{}
if err := v.cli.Get(ctx, types.NamespacedName{Namespace: owner.GetNamespace(), Name: podGroupName}, &podGroup); err != nil {
if !errors.IsNotFound(err) {
logger.Error(err, "failed to get PodGroup", "podGroupName", podGroupName, "ownerKind", utils.GetCRDType(owner.GetLabels()[utils.RayOriginatedFromCRDLabelKey]), "ownerName", owner.GetName(), "ownerNamespace", owner.GetNamespace())
return err
}

podGroup := createPodGroup(app, podGroupName, size, totalResource)
podGroup := createPodGroup(owner, podGroupName, size, totalResource)
if err := v.cli.Create(ctx, &podGroup); err != nil {
if errors.IsAlreadyExists(err) {
logger.Info("pod group already exists, no need to create")
logger.Info("podGroup already exists, no need to create", "name", podGroupName)
return nil
}

logger.Error(err, "Pod group CREATE error!", "PodGroup.Error", err)
logger.Error(err, "failed to create PodGroup", "name", podGroupName)
return err
}
} else {
if podGroup.Spec.MinMember != size || !quotav1.Equals(*podGroup.Spec.MinResources, totalResource) {
if podGroup.Spec.MinMember != size || podGroup.Spec.MinResources == nil || !quotav1.Equals(*podGroup.Spec.MinResources, totalResource) {
podGroup.Spec.MinMember = size
podGroup.Spec.MinResources = &totalResource
if err := v.cli.Update(ctx, &podGroup); err != nil {
logger.Error(err, "Pod group UPDATE error!", "podGroup", podGroupName)
logger.Error(err, "failed to update PodGroup", "name", podGroupName)
return err
}
}
}
return nil
}

func createPodGroup(
app *rayv1.RayCluster,
podGroupName string,
size int32,
totalResource corev1.ResourceList,
) volcanov1beta1.PodGroup {
podGroup := volcanov1beta1.PodGroup{
func (v *VolcanoBatchScheduler) calculatePodGroupParams(ctx context.Context, rayClusterSpec *rayv1.RayClusterSpec) (int32, corev1.ResourceList) {
rayCluster := &rayv1.RayCluster{Spec: *rayClusterSpec}

if !utils.IsAutoscalingEnabled(rayClusterSpec) {
return utils.CalculateDesiredReplicas(ctx, rayCluster) + 1, utils.CalculateDesiredResources(rayCluster)
}
return utils.CalculateMinReplicas(rayCluster) + 1, utils.CalculateMinResources(rayCluster)
}

func createPodGroup(owner metav1.Object, podGroupName string, size int32, totalResource corev1.ResourceList) volcanoschedulingv1beta1.PodGroup {
var ownerRef metav1.OwnerReference
switch obj := owner.(type) {
case *rayv1.RayCluster:
ownerRef = *metav1.NewControllerRef(obj, rayv1.SchemeGroupVersion.WithKind("RayCluster"))
case *rayv1.RayJob:
ownerRef = *metav1.NewControllerRef(obj, rayv1.SchemeGroupVersion.WithKind("RayJob"))
}

podGroup := volcanoschedulingv1beta1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Namespace: app.Namespace,
Name: podGroupName,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(app, rayv1.SchemeGroupVersion.WithKind("RayCluster")),
},
Namespace: owner.GetNamespace(),
Name: podGroupName,
OwnerReferences: []metav1.OwnerReference{ownerRef},
},
Spec: volcanov1beta1.PodGroupSpec{
Spec: volcanoschedulingv1beta1.PodGroupSpec{
MinMember: size,
MinResources: &totalResource,
},
Status: volcanov1beta1.PodGroupStatus{
Phase: volcanov1beta1.PodGroupPending,
Status: volcanoschedulingv1beta1.PodGroupStatus{
Phase: volcanoschedulingv1beta1.PodGroupPending,
},
}

if queue, ok := app.ObjectMeta.Labels[QueueNameLabelKey]; ok {
if queue, ok := owner.GetLabels()[QueueNameLabelKey]; ok {
podGroup.Spec.Queue = queue
}

if priorityClassName, ok := app.ObjectMeta.Labels[utils.RayPriorityClassName]; ok {
if priorityClassName, ok := owner.GetLabels()[utils.RayPriorityClassName]; ok {
podGroup.Spec.PriorityClassName = priorityClassName
}

return podGroup
}

func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, parent metav1.Object, child metav1.Object, groupName string) {
populateLabelsFromObject(parent, child, QueueNameLabelKey)
populateLabelsFromObject(parent, child, utils.RayPriorityClassName)
populateAnnotations(parent, child, groupName)
addSchedulerName(child, v.Name())
}

// This function will be removed in interface migration PR
func (v *VolcanoBatchScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod) {
pod.Annotations[volcanov1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(app)
pod.Annotations[volcanov1alpha1.TaskSpecKey] = groupName
pod.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(app)
pod.Annotations[volcanobatchv1alpha1.TaskSpecKey] = groupName
if queue, ok := app.ObjectMeta.Labels[QueueNameLabelKey]; ok {
pod.Labels[QueueNameLabelKey] = queue
}
Expand All @@ -143,11 +228,8 @@ func (v *VolcanoBatchScheduler) AddMetadataToPod(_ context.Context, app *rayv1.R
pod.Spec.SchedulerName = v.Name()
}

func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
}

func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) {
if err := volcanov1beta1.AddToScheme(cli.Scheme()); err != nil {
if err := volcanoschedulingv1beta1.AddToScheme(cli.Scheme()); err != nil {
return nil, fmt.Errorf("failed to add volcano to scheme with error %w", err)
}
return &VolcanoBatchScheduler{
Expand All @@ -156,9 +238,9 @@ func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, c
}

func (vf *VolcanoBatchSchedulerFactory) AddToScheme(scheme *runtime.Scheme) {
utilruntime.Must(volcanov1beta1.AddToScheme(scheme))
utilruntime.Must(volcanoschedulingv1beta1.AddToScheme(scheme))
}

func (vf *VolcanoBatchSchedulerFactory) ConfigureReconciler(b *builder.Builder) *builder.Builder {
return b.Owns(&volcanov1beta1.PodGroup{})
return b.Owns(&volcanoschedulingv1beta1.PodGroup{})
}
Loading
Loading