Skip to content
2 changes: 1 addition & 1 deletion apiserver/pkg/server/ray_job_submission_service_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type RayJobSubmissionServiceServer struct {
// Create RayJobSubmissionServiceServer
func NewRayJobSubmissionServiceServer(clusterServer *ClusterServer, options *RayJobSubmissionServiceServerOptions) *RayJobSubmissionServiceServer {
zl := zerolog.New(os.Stdout).Level(zerolog.DebugLevel)
return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false)}
return &RayJobSubmissionServiceServer{clusterServer: clusterServer, options: options, log: zerologr.New(&zl).WithName("jobsubmissionservice"), dashboardClientFunc: utils.GetRayDashboardClientFunc(nil, false, nil)}
}

// Submit Ray job
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ require (
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion ray-operator/apis/config/v1alpha1/configuration_types.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package v1alpha1

import (
cmap "github.com/orcaman/concurrent-map/v2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/manager"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
)

//+kubebuilder:object:root=true
Expand Down Expand Up @@ -86,7 +88,8 @@ type Configuration struct {
}

func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy)
jobInfoMap := cmap.New[*utiltypes.RayJobCache]()
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy, &jobInfoMap)
}

func (config Configuration) GetHttpProxyClient(mgr manager.Manager) func(hostIp, podNamespace, podName string, port int) utils.RayHttpProxyClientInterface {
Expand Down
32 changes: 24 additions & 8 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/ray-project/kuberay/ray-operator/controllers/ray/metrics"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
"github.com/ray-project/kuberay/ray-operator/pkg/features"
)

Expand All @@ -41,9 +42,8 @@ const (
// RayJobReconciler reconciles a RayJob object
type RayJobReconciler struct {
client.Client
Scheme *runtime.Scheme
Recorder record.EventRecorder

Scheme *runtime.Scheme
Recorder record.EventRecorder
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error)
options RayJobReconcilerOptions
}
Expand Down Expand Up @@ -275,10 +275,24 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
if err != nil {
// If the Ray job was not found, GetJobInfo returns a BadRequest error.
var jobInfo *utiltypes.RayJobInfo
jobCache := rayDashboardClient.GetJobInfoFromCache(rayJobInstance.Status.JobId)
if jobCache != nil {
if jobCache.Err != nil {
if errors.IsBadRequest(jobCache.Err) && isSubmitterFinished {
rayJobInstance.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed
rayJobInstance.Status.Reason = rayv1.AppFailed
rayJobInstance.Status.Message = "Submitter completed but Ray job not found in RayCluster."
break
}
logger.Error(jobCache.Err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId, "Error", jobCache.Err)
rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, jobCache.Err
}
jobInfo = jobCache.JobInfo
} else {
// Cache miss: try a direct fetch to disambiguate not-found vs. transient
jobInfo, err = rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
if errors.IsBadRequest(err) {
if rayJobInstance.Spec.SubmissionMode == rayv1.HTTPMode {
logger.Info("The Ray job was not found. Submit a Ray job via an HTTP request.", "JobId", rayJobInstance.Status.JobId)
Expand All @@ -295,11 +309,13 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
break
}
}
}

rayDashboardClient.AsyncGetJobInfo(ctx, rayJobInstance.Status.JobId)
if jobInfo == nil {
logger.Error(err, "Failed to get job info", "JobId", rayJobInstance.Status.JobId)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

// If the JobStatus is in a terminal status, such as SUCCEEDED, FAILED, or STOPPED, it is impossible for the Ray job
// to transition to any other. Additionally, RayJob does not currently support retries. Hence, we can mark the RayJob
// as "Complete" or "Failed" to avoid unnecessary reconciliation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"strings"

cmap "github.com/orcaman/concurrent-map/v2"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/yaml"
Expand All @@ -27,28 +28,36 @@ var (
)

type RayDashboardClientInterface interface {
InitClient(client *http.Client, dashboardURL string)
InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache], workerPoolChannelContent *cmap.ConcurrentMap[string, struct{}])
UpdateDeployments(ctx context.Context, configJson []byte) error
// V2/multi-app Rest API
GetServeDetails(ctx context.Context) (*utiltypes.ServeDetails, error)
GetMultiApplicationStatus(context.Context) (map[string]*utiltypes.ServeApplicationStatus, error)
GetJobInfo(ctx context.Context, jobId string) (*utiltypes.RayJobInfo, error)
AsyncGetJobInfo(ctx context.Context, jobId string)
ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error)
SubmitJob(ctx context.Context, rayJob *rayv1.RayJob) (string, error)
SubmitJobReq(ctx context.Context, request *utiltypes.RayJobRequest) (string, error)
GetJobLog(ctx context.Context, jobName string) (*string, error)
StopJob(ctx context.Context, jobName string) error
DeleteJob(ctx context.Context, jobName string) error
GetJobInfoFromCache(jobId string) *utiltypes.RayJobCache
}

type RayDashboardClient struct {
client *http.Client
dashboardURL string
client *http.Client
workerPoolChannelContent *cmap.ConcurrentMap[string, struct{}]
workerPool *WorkerPool
jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache]
dashboardURL string
}

func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string) {
func (r *RayDashboardClient) InitClient(client *http.Client, dashboardURL string, workerPool *WorkerPool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache], workerPoolChannelContent *cmap.ConcurrentMap[string, struct{}]) {
r.client = client
r.dashboardURL = dashboardURL
r.workerPool = workerPool
r.jobInfoMap = jobInfoMap
r.workerPoolChannelContent = workerPoolChannelContent
}

// UpdateDeployments update the deployments in the Ray cluster.
Expand Down Expand Up @@ -171,6 +180,25 @@ func (r *RayDashboardClient) GetJobInfo(ctx context.Context, jobId string) (*uti
return &jobInfo, nil
}

func (r *RayDashboardClient) AsyncGetJobInfo(ctx context.Context, jobId string) {
if _, ok := r.workerPoolChannelContent.Get(jobId); ok {
return
}
r.workerPoolChannelContent.Set(jobId, struct{}{})
r.workerPool.taskQueue <- func() {
defer r.workerPoolChannelContent.Remove(jobId)
jobInfo, err := r.GetJobInfo(ctx, jobId)
if err != nil {
err = fmt.Errorf("failed to get job info: %w", err)
r.jobInfoMap.Set(jobId, &utiltypes.RayJobCache{JobInfo: nil, Err: err})
return
}
if jobInfo != nil {
r.jobInfoMap.Set(jobId, &utiltypes.RayJobCache{JobInfo: jobInfo, Err: nil})
}
}
}

func (r *RayDashboardClient) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, r.dashboardURL+JobPath, nil)
if err != nil {
Expand Down Expand Up @@ -221,6 +249,7 @@ func (r *RayDashboardClient) SubmitJobReq(ctx context.Context, request *utiltype
}

req.Header.Set("Content-Type", "application/json")

resp, err := r.client.Do(req)
if err != nil {
return
Expand Down Expand Up @@ -333,6 +362,13 @@ func (r *RayDashboardClient) DeleteJob(ctx context.Context, jobName string) erro
return nil
}

func (r *RayDashboardClient) GetJobInfoFromCache(jobId string) *utiltypes.RayJobCache {
if jobInfo, ok := r.jobInfoMap.Get(jobId); ok {
return jobInfo
}
return nil
}

func ConvertRayJobToReq(rayJob *rayv1.RayJob) (*utiltypes.RayJobRequest, error) {
req := &utiltypes.RayJobRequest{
Entrypoint: rayJob.Spec.Entrypoint,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package dashboardclient

import (
"sync"
)

type WorkerPool struct {
taskQueue chan func()
stopChan chan struct{}
wg sync.WaitGroup
workers int
}

func NewWorkerPool(taskQueue chan func()) *WorkerPool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func NewWorkerPool(taskQueue chan func()) *WorkerPool {
func NewWorkerPool(workers int) *WorkerPool {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing a task queue channel is weird. Specifying a worker count is more understandable. You can also make a buffered channel based on the worker count internally.

wp := &WorkerPool{
taskQueue: taskQueue,
workers: 10,
stopChan: make(chan struct{}),
}

// Start workers immediately
wp.start()
return wp
}

// Start launches worker goroutines to consume from queue
func (wp *WorkerPool) start() {
for i := 0; i < wp.workers; i++ {
wp.wg.Add(1)
go wp.worker()
}
}

// worker consumes and executes tasks from the queue
func (wp *WorkerPool) worker() {
defer wp.wg.Done()

for {
select {
case <-wp.stopChan:
return
case task := <-wp.taskQueue:
if task != nil {
task() // Execute the job
}
}
}
}
11 changes: 10 additions & 1 deletion ray-operator/controllers/ray/utils/fake_serve_httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"net/http"
"sync/atomic"

cmap "github.com/orcaman/concurrent-map/v2"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
Expand All @@ -19,7 +21,7 @@ type FakeRayDashboardClient struct {

var _ dashboardclient.RayDashboardClientInterface = (*FakeRayDashboardClient)(nil)

func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string) {
func (r *FakeRayDashboardClient) InitClient(_ *http.Client, _ string, _ *dashboardclient.WorkerPool, _ *cmap.ConcurrentMap[string, *utiltypes.RayJobCache], _ *cmap.ConcurrentMap[string, struct{}]) {
}

func (r *FakeRayDashboardClient) UpdateDeployments(_ context.Context, _ []byte) error {
Expand All @@ -46,6 +48,9 @@ func (r *FakeRayDashboardClient) GetJobInfo(ctx context.Context, jobId string) (
return &utiltypes.RayJobInfo{JobStatus: rayv1.JobStatusRunning}, nil
}

func (r *FakeRayDashboardClient) AsyncGetJobInfo(_ context.Context, _ string) {
}

func (r *FakeRayDashboardClient) ListJobs(ctx context.Context) (*[]utiltypes.RayJobInfo, error) {
if mock := r.GetJobInfoMock.Load(); mock != nil {
info, err := (*mock)(ctx, "job_id")
Expand Down Expand Up @@ -77,3 +82,7 @@ func (r *FakeRayDashboardClient) StopJob(_ context.Context, _ string) (err error
func (r *FakeRayDashboardClient) DeleteJob(_ context.Context, _ string) error {
return nil
}

func (r *FakeRayDashboardClient) GetJobInfoFromCache(_ string) *utiltypes.RayJobCache {
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,8 @@ type RayJobStopResponse struct {
type RayJobLogsResponse struct {
Logs string `json:"logs,omitempty"`
}

type RayJobCache struct {
JobInfo *RayJobInfo
Err error
}
12 changes: 10 additions & 2 deletions ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"
"unicode"

cmap "github.com/orcaman/concurrent-map/v2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -27,6 +28,7 @@ import (

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/dashboardclient"
utiltypes "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils/types"
)

const (
Expand Down Expand Up @@ -758,7 +760,10 @@ func FetchHeadServiceURL(ctx context.Context, cli client.Client, rayCluster *ray
return headServiceURL, nil
}

func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool, jobInfoMap *cmap.ConcurrentMap[string, *utiltypes.RayJobCache]) func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
taskQueue := make(chan func())
workerPool := dashboardclient.NewWorkerPool(taskQueue)
workerPoolChannelContent := cmap.New[struct{}]()
return func(rayCluster *rayv1.RayCluster, url string) (dashboardclient.RayDashboardClientInterface, error) {
dashboardClient := &dashboardclient.RayDashboardClient{}
if useKubernetesProxy {
Expand All @@ -777,13 +782,16 @@ func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) fun
// configured to communicate with the Kubernetes API server.
mgr.GetHTTPClient(),
fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", mgr.GetConfig().Host, rayCluster.Namespace, headSvcName),
workerPool,
jobInfoMap,
&workerPoolChannelContent,
)
return dashboardClient, nil
}

dashboardClient.InitClient(&http.Client{
Timeout: 2 * time.Second,
}, "http://"+url)
}, "http://"+url, workerPool, jobInfoMap, &workerPoolChannelContent)
return dashboardClient, nil
}
}
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/rayjob-submitter/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func main() {
}
rayDashboardClient := &dashboardclient.RayDashboardClient{}
address = rayjobsubmitter.JobSubmissionURL(address)
rayDashboardClient.InitClient(&http.Client{Timeout: time.Second * 10}, address)
rayDashboardClient.InitClient(&http.Client{Timeout: time.Second * 10}, address, nil, nil, nil)
submissionId, err := rayDashboardClient.SubmitJobReq(context.Background(), &req)
if err != nil {
if strings.Contains(err.Error(), "Please use a different submission_id") {
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/test/e2erayjob/rayjob_retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestRayJobRetry(t *testing.T) {
LogWithTimestamp(test.T(), "Waiting for RayJob %s/%s to complete", rayJob.Namespace, rayJob.Name)

// Ensure JobDeploymentStatus transit to Failed
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutLong).
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusFailed)))
// Ensure JobStatus is empty
g.Expect(GetRayJob(test, rayJob.Namespace, rayJob.Name)).
Expand Down
2 changes: 1 addition & 1 deletion ray-operator/test/sampleyaml/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func QueryDashboardGetAppStatus(t Test, rayCluster *rayv1.RayCluster) func(Gomeg

g.Expect(err).ToNot(HaveOccurred())
url := fmt.Sprintf("127.0.0.1:%d", localPort)
rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false)
rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false, nil)
rayDashboardClient, err := rayDashboardClientFunc(rayCluster, url)
g.Expect(err).ToNot(HaveOccurred())
serveDetails, err := rayDashboardClient.GetServeDetails(t.Ctx())
Expand Down
Loading