Skip to content
2 changes: 1 addition & 1 deletion apiserver/pkg/server/ray_job_submission_service_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type RayJobSubmissionServiceServer struct {
// Create RayJobSubmissionServiceServer
func NewRayJobSubmissionServiceServer(clusterServer *ClusterServer, options *RayJobSubmissionServiceServerOptions) *RayJobSubmissionServiceServer {
zl := zerolog.New(os.Stdout).Level(zerolog.DebugLevel)
return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false)}
return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false, nil)}
}

// Submit Ray job
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ require (
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion ray-operator/apis/config/v1alpha1/configuration_types.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package v1alpha1

import (
cmap "github.com/orcaman/concurrent-map/v2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/manager"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
)

//+kubebuilder:object:root=true
Expand Down Expand Up @@ -86,7 +88,8 @@ type Configuration struct {
}

func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy)
jobInfoMap := cmap.New[*utiltypes.RayJobCache]()
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy, &jobInfoMap)
}

func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) utils.RayHttpProxyClientInterface {
Expand Down
25 changes: 18 additions & 7 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
"github.com/ray-project/kuberay/ray-operator/pkg/features"
)

Expand All @@ -41,9 +42,8 @@ const (
// RayJobReconciler reconciles a RayJob object
type RayJobReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder

Scheme *runtime.Scheme
Recorder record.EventRecorder
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error)
options RayJobReconcilerOptions
}
Expand Down Expand Up @@ -275,9 +275,15 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
if err != nil {
var jobInfo *utiltypes.RayJobInfo
if loadedJobCache := rayDashboardClient.GetJobInfoFromCache(rayJobInstance.Status.JobId); loadedJobCache != nil {
if loadedJobCache.Err != nil {
rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId)
logger.Error(loadedJobCache.Err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId, "Error", loadedJobCache.Err)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, loadedJobCache.Err
}
jobInfo = loadedJobCache.JobInfo
} else {
// If the Ray job was not found, GetJobInfo returns a BadRequest error.
if errors.IsBadRequest(err) {
if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode {
Expand All @@ -295,11 +301,16 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
break
}
}
logger.Info("Job info not found in map", "JobId", rayJobInstance.Status.JobId)
rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId)
if jobInfo == nil {
logger.Error(err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

// If the JobStatus is in a terminal status, such as SUCCEEDED, FAILED, or STOPPED, it is impossible for the Ray job
// to transition to any other. Additionally, RayJob does not currently support retries. Hence, we can mark the RayJob
// as "Complete" or "Failed" to avoid unnecessary reconciliation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"strings"

cmap "github.com/orcaman/concurrent-map/v2"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/yaml"
Expand All @@ -27,28 +28,34 @@ var (
)

type RayDashboardClientInterface interface {
InitClient(client *http.Client, dashboardURL string)
InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache])
Copy link
Collaborator

@rueian rueian Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache])
InitClient(client *http.Client, dashboardURL string)

Hide the jobInfoMap and workerPool implementation details.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it okay to keep it if controller not directly call InitClient?
Currently controller will create a new DashboardClient for every reconciliation if we need to put the creation of workerPool , cmap .. in InitClient we will recreate them every reconciliation which is not what we want.

Current GetRayDashboardClientFunc acts kind of like a factory.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't they be created once in the GetRayDashboardClientFunc?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just like what you did with workerPool.

UpdateDeployments(ctx context.Context, configJson []byte) error
// V2/multi-app Rest API
GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error)
GetMultiApplicationStatus(context.Context) (map[string]*utiltypes.ServeApplicationStatus, error)
GetJobInfo(ctx context.Context, jobId string) (*utiltypes.RayJobInfo, error)
AsyncGetJobInfo(ctx context.Context, jobId string)
ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error)
SubmitJob(ctx context.Context, rayJob *rayv1.RayJob) (string, error)
SubmitJobReq(ctx context.Context, request *utiltypes.RayJobRequest) (string, error)
GetJobLog(ctx context.Context, jobName string) (*string, error)
StopJob(ctx context.Context, jobName string) error
DeleteJob(ctx context.Context, jobName string) error
GetJobInfoFromCache(jobId string) *utiltypes.RayJobCache
}

type RayDashboardClient struct {
client *http.Client
workerPool *WorkerPool
jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache]
dashboardURL string
}

func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string) {
func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache]) {
r.client = client
r.dashboardURL = dashboardURL
r.workerPool = workerPool
r.jobInfoMap = jobInfoMap
}

// UpdateDeployments update the deployments in the Ray cluster.
Expand Down Expand Up @@ -171,6 +178,25 @@ func (r *RayDashboardClient) GetJobInfo(ctx context.Context, jobId string) (*uti
return &jobInfo, nil
}

func (r *RayDashboardClient) AsyncGetJobInfo(ctx context.Context, jobId string) {
if _, ok := r.workerPool.channelContent.Get(jobId); ok {
return
}
r.workerPool.channelContent.Set(jobId, struct{}{})
r.workerPool.taskQueue <- func() {
defer r.workerPool.channelContent.Remove(jobId)
jobInfo, err := r.GetJobInfo(ctx, jobId)
if err != nil {
err = fmt.Errorf("failed to get job info: %w", err)
r.jobInfoMap.Set(jobId, &utiltypes.RayJobCache{JobInfo: nil, Err: err})
return
}
if jobInfo != nil {
r.jobInfoMap.Set(jobId, &utiltypes.RayJobCache{JobInfo: jobInfo, Err: nil})
}
}
}

func (r *RayDashboardClient) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, r.dashboardURL+JobPath, nil)
if err != nil {
Expand Down Expand Up @@ -221,6 +247,7 @@ func (r *RayDashboardClient) SubmitJobReq(ctx context.Context, request *utiltype
}

req.Header.Set("Content-Type", "application/json")

resp, err := r.client.Do(req)
if err != nil {
return
Expand Down Expand Up @@ -333,6 +360,13 @@ func (r *RayDashboardClient) DeleteJob(ctx context.Context, jobName string) erro
return nil
}

func (r *RayDashboardClient) GetJobInfoFromCache(jobId string) *utiltypes.RayJobCache {
if jobInfo, ok := r.jobInfoMap.Get(jobId); ok {
return jobInfo
}
return nil
}

func ConvertRayJobToReq(rayJob *rayv1.RayJob) (*utiltypes.RayJobRequest, error) {
req := &utiltypes.RayJobRequest{
Entrypoint: rayJob.Spec.Entrypoint,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package dashboardclient

import (
"sync"

cmap "github.com/orcaman/concurrent-map/v2"
)

type WorkerPool struct {
channelContent cmap.ConcurrentMap[string, struct{}]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to the dashboard client.

taskQueue chan func()
stop chan struct{}
wg sync.WaitGroup
workers int
}

func NewWorkerPool(taskQueue chan func()) *WorkerPool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func NewWorkerPool(taskQueue chan func()) *WorkerPool {
func NewWorkerPool(workers int) *WorkerPool {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing a task queue channel is weird. Specifying a worker count is more understandable. You can also make a buffered channel based on the worker count internally.

wp := &WorkerPool{
taskQueue: taskQueue,
workers: 10,
stop: make(chan struct{}),
channelContent: cmap.New[struct{}](),
}

// Start workers immediately
wp.Start()
return wp
}

// Start launches worker goroutines to consume from queue
func (wp *WorkerPool) Start() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this private.

for i := 0; i < wp.workers; i++ {
wp.wg.Add(1)
go wp.worker()
}
}

// worker consumes and executes tasks from the queue
func (wp *WorkerPool) worker() {
defer wp.wg.Done()

for {
select {
case <-wp.stop:
return
case task := <-wp.taskQueue:
if task != nil {
task() // Execute the job
}
}
}
}

// Stop shuts down all workers
func (wp *WorkerPool) Stop() {
close(wp.stop)
wp.wg.Wait()
}
11 changes: 10 additions & 1 deletion ray-operator/controllers/ray/utils/fake_serve_httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net/http"
"sync/atomic"

cmap "github.com/orcaman/concurrent-map/v2"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
Expand All @@ -19,7 +21,7 @@ type FakeRayDashboardClient struct {

var _ dashboardclient.RayDashboardClientInterface = (*FakeRayDashboardClient)(nil)

func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string) {
func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string, _ *dashboardclient.WorkerPool, _ *cmap.ConcurrentMap[string, *utiltypes.RayJobCache]) {
}

func (r *FakeRayDashboardClient) UpdateDeployments(_ context.Context, _ []byte) error {
Expand All @@ -46,6 +48,9 @@ func (r *FakeRayDashboardClient) GetJobInfo(ctx context.Context, jobId string) (
return &utiltypes.RayJobInfo{JobStatus: rayv1.JobStatusRunning}, nil
}

func (r *FakeRayDashboardClient) AsyncGetJobInfo(_ context.Context, _ string) {
}

func (r *FakeRayDashboardClient) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) {
if mock := r.GetJobInfoMock.Load(); mock != nil {
info, err := (*mock)(ctx, "job_id")
Expand Down Expand Up @@ -77,3 +82,7 @@ func (r *FakeRayDashboardClient) StopJob(_ context.Context, _ string) (err error
func (r *FakeRayDashboardClient) DeleteJob(_ context.Context, _ string) error {
return nil
}

func (r *FakeRayDashboardClient) GetJobInfoFromCache(_ string) *utiltypes.RayJobCache {
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,8 @@ type RayJobStopResponse struct {
type RayJobLogsResponse struct {
Logs string `json:"logs,omitempty"`
}

type RayJobCache struct {
JobInfo *RayJobInfo
Err error
}
10 changes: 8 additions & 2 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"
"unicode"

cmap "github.com/orcaman/concurrent-map/v2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -27,6 +28,7 @@ import (

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
)

const (
Expand Down Expand Up @@ -758,7 +760,9 @@ func FetchHeadServiceURL(ctx context.Context, cli client.Client, rayCluster *ray
return headServiceURL, nil
}

func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
taskQueue := make(chan func())
workerPool := dashboardclient.NewWorkerPool(taskQueue)
return func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
dashboardClient := &dashboardclient.RayDashboardClient{}
if useKubernetesProxy {
Expand All @@ -777,13 +781,15 @@ func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) fun
// configured to communicate with the Kubernetes API server.
mgr.GetHTTPClient(),
fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", mgr.GetConfig().Host, rayCluster.Namespace, headSvcName),
workerPool,
jobInfoMap,
)
return dashboardClient, nil
}

dashboardClient.InitClient(&http.Client{
Timeout: 2 * time.Second,
}, "http://"+url)
}, "http://"+url, workerPool, jobInfoMap)
return dashboardClient, nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/rayjob-submitter/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func main() {
}
rayDashboardClient := &dashboardclient.RayDashboardClient{}
address = rayjobsubmitter.JobSubmissionURL(address)
rayDashboardClient.InitClient(&http.Client{Timeout: time.Second * 10}, address)
rayDashboardClient.InitClient(&http.Client{Timeout: time.Second * 10}, address, nil, nil)
submissionId, err := rayDashboardClient.SubmitJobReq(context.Background(), &req)
if err != nil {
if strings.Contains(err.Error(), "Please use a different submission_id") {
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/test/sampleyaml/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func QueryDashboardGetAppStatus(t Test, rayCluster *rayv1.RayCluster) func(Gomeg

g.Expect(err).ToNot(HaveOccurred())
url := fmt.Sprintf("127.0.0.1:%d", localPort)
rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false)
rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false, nil)
rayDashboardClient, err := rayDashboardClientFunc(rayCluster, url)
g.Expect(err).ToNot(HaveOccurred())
serveDetails, err := rayDashboardClient.GetServeDetails(t.Ctx())
Expand Down
Loading