Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 14 additions & 19 deletions apiserver/pkg/server/ray_job_submission_service_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sigs.k8s.io/yaml"

api "github.com/ray-project/kuberay/proto/go_client"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

Expand All @@ -31,7 +32,7 @@ type RayJobSubmissionServiceServer struct {
api.UnimplementedRayJobSubmissionServiceServer
options *RayJobSubmissionServiceServerOptions
clusterServer *ClusterServer
dashboardClientFunc func() utils.RayDashboardClientInterface
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (utils.RayDashboardClientInterface, error)
log logr.Logger
}

Expand All @@ -49,9 +50,8 @@ func (s *RayJobSubmissionServiceServer) SubmitRayJob(ctx context.Context, req *a
if err != nil {
return nil, err
}
rayDashboardClient := s.dashboardClientFunc()
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
if err != nil {
return nil, err
}
request := &utils.RayJobRequest{Entrypoint: req.Jobsubmission.Entrypoint}
Expand Down Expand Up @@ -104,9 +104,8 @@ func (s *RayJobSubmissionServiceServer) GetJobDetails(ctx context.Context, req *
if err != nil {
return nil, err
}
rayDashboardClient := s.dashboardClientFunc()
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
if err != nil {
return nil, err
}
nodeInfo, err := rayDashboardClient.GetJobInfo(ctx, req.Submissionid)
Expand All @@ -127,9 +126,8 @@ func (s *RayJobSubmissionServiceServer) GetJobLog(ctx context.Context, req *api.
if err != nil {
return nil, err
}
rayDashboardClient := s.dashboardClientFunc()
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
if err != nil {
return nil, err
}
jlog, err := rayDashboardClient.GetJobLog(ctx, req.Submissionid)
Expand All @@ -150,9 +148,8 @@ func (s *RayJobSubmissionServiceServer) ListJobDetails(ctx context.Context, req
if err != nil {
return nil, err
}
rayDashboardClient := s.dashboardClientFunc()
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
if err != nil {
return nil, err
}
nodesInfo, err := rayDashboardClient.ListJobs(ctx)
Expand All @@ -174,9 +171,8 @@ func (s *RayJobSubmissionServiceServer) StopRayJob(ctx context.Context, req *api
if err != nil {
return nil, err
}
rayDashboardClient := s.dashboardClientFunc()
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
if err != nil {
return nil, err
}
err = rayDashboardClient.StopJob(ctx, req.Submissionid)
Expand All @@ -194,9 +190,8 @@ func (s *RayJobSubmissionServiceServer) DeleteRayJob(ctx context.Context, req *a
if err != nil {
return nil, err
}
rayDashboardClient := s.dashboardClientFunc()
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
rayDashboardClient, err := s.dashboardClientFunc(nil, *url)
if err != nil {
return nil, err
}
err = rayDashboardClient.DeleteJob(ctx, req.Submissionid)
Expand Down
3 changes: 2 additions & 1 deletion ray-operator/apis/config/v1alpha1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/manager"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

Expand Down Expand Up @@ -75,7 +76,7 @@ type Configuration struct {
EnableMetrics bool `json:"enableMetrics,omitempty"`
}

func (config Configuration) GetDashboardClient(mgr manager.Manager) func() utils.RayDashboardClientInterface {
func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (utils.RayDashboardClientInterface, error) {
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy)
}

Expand Down
14 changes: 8 additions & 6 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type RayJobReconciler struct {
Scheme *runtime.Scheme
Recorder record.EventRecorder

dashboardClientFunc func() utils.RayDashboardClientInterface
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (utils.RayDashboardClientInterface, error)
options RayJobReconcilerOptions
}

Expand Down Expand Up @@ -115,9 +115,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
logger.Error(err, "Failed to get RayCluster")
}

rayDashboardClient := r.dashboardClientFunc()
if err := rayDashboardClient.InitClient(ctx, rayJobInstance.Status.DashboardURL, rayClusterInstance); err != nil {
logger.Error(err, "Failed to initialize dashboard client")
rayDashboardClient, err := r.dashboardClientFunc(rayClusterInstance, rayJobInstance.Status.DashboardURL)
if err != nil {
logger.Error(err, "Failed to get dashboard client for RayJob")
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
if err := rayDashboardClient.StopJob(ctx, rayJobInstance.Status.JobId); err != nil {
logger.Error(err, "Failed to stop job for RayJob")
Expand Down Expand Up @@ -260,8 +261,9 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
}

// Check the current status of ray jobs
rayDashboardClient := r.dashboardClientFunc()
if err := rayDashboardClient.InitClient(ctx, rayJobInstance.Status.DashboardURL, rayClusterInstance); err != nil {
rayDashboardClient, err := r.dashboardClientFunc(rayClusterInstance, rayJobInstance.Status.DashboardURL)
if err != nil {
logger.Error(err, "Failed to get dashboard client for RayJob")
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}

Expand Down
6 changes: 3 additions & 3 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type RayServiceReconciler struct {
// Cache value is map of RayCluster name to Serve application config.
ServeConfigs *lru.Cache
RayClusterDeletionTimestamps cmap.ConcurrentMap[string, time.Time]
dashboardClientFunc func() utils.RayDashboardClientInterface
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) (utils.RayDashboardClientInterface, error)
httpProxyClientFunc func() utils.RayHttpProxyClientInterface
}

Expand Down Expand Up @@ -943,8 +943,8 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
return false, serveApplications, err
}

rayDashboardClient := r.dashboardClientFunc()
if err := rayDashboardClient.InitClient(ctx, clientURL, rayClusterInstance); err != nil {
rayDashboardClient, err := r.dashboardClientFunc(rayClusterInstance, clientURL)
if err != nil {
return false, serveApplications, err
}

Expand Down
6 changes: 3 additions & 3 deletions ray-operator/controllers/ray/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ var (

type TestClientProvider struct{}

func (testProvider TestClientProvider) GetDashboardClient(_ manager.Manager) func() utils.RayDashboardClientInterface {
return func() utils.RayDashboardClientInterface {
return fakeRayDashboardClient
func (testProvider TestClientProvider) GetDashboardClient(_ manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (utils.RayDashboardClientInterface, error) {
return func(_ *rayv1.RayCluster, _ string) (utils.RayDashboardClientInterface, error) {
return fakeRayDashboardClient, nil
}
}

Expand Down
46 changes: 1 addition & 45 deletions ray-operator/controllers/ray/utils/dashboard_httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"io"
"net/http"
"time"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/json"
Expand All @@ -25,7 +24,6 @@ var (
)

type RayDashboardClientInterface interface {
InitClient(ctx context.Context, url string, rayCluster *rayv1.RayCluster) error
UpdateDeployments(ctx context.Context, configJson []byte) error
// V2/multi-app Rest API
GetServeDetails(ctx context.Context) (*ServeDetails, error)
Expand All @@ -39,53 +37,11 @@ type RayDashboardClientInterface interface {
DeleteJob(ctx context.Context, jobName string) error
}

type BaseDashboardClient struct {
type RayDashboardClient struct {
client *http.Client
dashboardURL string
}

func GetRayDashboardClientFunc(mgr ctrl.Manager, useKubernetesProxy bool) func() RayDashboardClientInterface {
return func() RayDashboardClientInterface {
return &RayDashboardClient{
mgr: mgr,
useKubernetesProxy: useKubernetesProxy,
}
}
}

type RayDashboardClient struct {
mgr ctrl.Manager
BaseDashboardClient
useKubernetesProxy bool
}

func (r *RayDashboardClient) InitClient(ctx context.Context, url string, rayCluster *rayv1.RayCluster) error {
log := ctrl.LoggerFrom(ctx)

if r.useKubernetesProxy {
var err error
headSvcName := rayCluster.Status.Head.ServiceName
if headSvcName == "" {
log.Info("RayCluster is missing .status.head.serviceName, calling GenerateHeadServiceName instead...", "RayCluster name", rayCluster.Name, "namespace", rayCluster.Namespace)
headSvcName, err = GenerateHeadServiceName(RayClusterCRD, rayCluster.Spec, rayCluster.Name)
if err != nil {
return err
}
}

r.client = r.mgr.GetHTTPClient()
r.dashboardURL = fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", r.mgr.GetConfig().Host, rayCluster.Namespace, headSvcName)
return nil
}

r.client = &http.Client{
Timeout: 2 * time.Second,
}

r.dashboardURL = "http://" + url
return nil
}

// UpdateDeployments update the deployments in the Ray cluster.
func (r *RayDashboardClient) UpdateDeployments(ctx context.Context, configJson []byte) error {
var req *http.Request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ var _ = Describe("RayFrameworkGenerator", func() {
}

rayDashboardClient = &RayDashboardClient{}
err := rayDashboardClient.InitClient(context.Background(), "127.0.0.1:8090", nil)
Expect(err).ToNot(HaveOccurred())
rayDashboardClient.dashboardURL = "http://127.0.0.1:8090"
rayDashboardClient.client = &http.Client{}
})

It("Test ConvertRayJobToReq", func() {
Expand Down
10 changes: 1 addition & 9 deletions ray-operator/controllers/ray/utils/fake_serve_httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package utils
import (
"context"
"fmt"
"net/http"
"sync/atomic"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
Expand All @@ -12,18 +11,11 @@ import (
type FakeRayDashboardClient struct {
multiAppStatuses map[string]*ServeApplicationStatus
GetJobInfoMock atomic.Pointer[func(context.Context, string) (*RayJobInfo, error)]
BaseDashboardClient
serveDetails ServeDetails
serveDetails ServeDetails
}

var _ RayDashboardClientInterface = (*FakeRayDashboardClient)(nil)

func (r *FakeRayDashboardClient) InitClient(_ context.Context, url string, _ *rayv1.RayCluster) error {
r.client = &http.Client{}
r.dashboardURL = "http://" + url
return nil
}

func (r *FakeRayDashboardClient) UpdateDeployments(_ context.Context, _ []byte) error {
fmt.Print("UpdateDeployments fake succeeds.")
return nil
Expand Down
30 changes: 29 additions & 1 deletion ray-operator/controllers/ray/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"encoding/base32"
"fmt"
"math"
"net/http"
"os"
"reflect"
"strconv"
"strings"
"time"
"unicode"

batchv1 "k8s.io/api/batch/v1"
Expand Down Expand Up @@ -638,7 +640,7 @@ func EnvVarByName(envName string, envVars []corev1.EnvVar) (corev1.EnvVar, bool)
}

type ClientProvider interface {
GetDashboardClient(mgr manager.Manager) func() RayDashboardClientInterface
GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) (RayDashboardClientInterface, error)
GetHttpProxyClient(mgr manager.Manager) func() RayHttpProxyClientInterface
}

Expand Down Expand Up @@ -754,3 +756,29 @@ func FetchHeadServiceURL(ctx context.Context, cli client.Client, rayCluster *ray
port)
return headServiceURL, nil
}

func GetRayDashboardClientFunc(mgr manager.Manager, useKubernetesProxy bool) func(rayCluster *rayv1.RayCluster, url string) (RayDashboardClientInterface, error) {
return func(rayCluster *rayv1.RayCluster, url string) (RayDashboardClientInterface, error) {
if useKubernetesProxy {
var err error
headSvcName := rayCluster.Status.Head.ServiceName
if headSvcName == "" {
headSvcName, err = GenerateHeadServiceName(RayClusterCRD, rayCluster.Spec, rayCluster.Name)
if err != nil {
return nil, err
}
}
return &RayDashboardClient{
client: mgr.GetHTTPClient(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between mgr.GetHTTPClient and http.Client{ ... }?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are using Kubernetes proxy, we should use mgr.GetHTTPClient which provides client with proper auth to make request to Kubernetes api server.
Or we will get error like this:

{"level":"error","ts":"2025-08-28T03:13:20.716Z","logger":"controllers.RayJob",
"msg":"Failed to get job info",
"RayJob":{"name":"rayjob-sample","namespace":"default"},"reconcileID":"6b31ed8f-4d3c-4d39-93b2-3e8a2741ce43","JobId":"rayjob-sample-zhmhv",
"error":"Get \"https://127.0.0.1:38963/api/v1/namespaces/default/services/rayjob-sample-dksrt-head-svc:dashboard/proxy/api/jobs/rayjob-sample-zhmhv\": tls: failed to verify certificate: x509: certificate signed by unknown authority"}

dashboardURL: fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", mgr.GetConfig().Host, rayCluster.Namespace, headSvcName),
}, nil
}

return &RayDashboardClient{
client: &http.Client{
Timeout: 2 * time.Second,
},
dashboardURL: "http://" + url,
}, nil
}
}
5 changes: 2 additions & 3 deletions ray-operator/test/sampleyaml/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func AllAppsRunning(rayService *rayv1.RayService) bool {

func QueryDashboardGetAppStatus(t Test, rayCluster *rayv1.RayCluster) func(Gomega) {
return func(g Gomega) {
rayDashboardClient := &utils.RayDashboardClient{}
pod, err := GetHeadPod(t, rayCluster)
g.Expect(err).ToNot(HaveOccurred())

Expand All @@ -76,8 +75,8 @@ func QueryDashboardGetAppStatus(t Test, rayCluster *rayv1.RayCluster) func(Gomeg

g.Expect(err).ToNot(HaveOccurred())
url := fmt.Sprintf("127.0.0.1:%d", localPort)

err = rayDashboardClient.InitClient(t.Ctx(), url, rayCluster)
rayDashboardClientFunc := utils.GetRayDashboardClientFunc(nil, false)
rayDashboardClient, err := rayDashboardClientFunc(rayCluster, url)
g.Expect(err).ToNot(HaveOccurred())
serveDetails, err := rayDashboardClient.GetServeDetails(t.Ctx())
g.Expect(err).ToNot(HaveOccurred())
Expand Down
Loading