Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 9 additions & 31 deletions apiserver/pkg/server/ray_job_submission_service_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"sigs.k8s.io/yaml"

api "github.com/ray-project/kuberay/proto/go_client"
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

Expand All @@ -31,7 +32,7 @@ type RayJobSubmissionServiceServer struct {
api.UnimplementedRayJobSubmissionServiceServer
options *RayJobSubmissionServiceServerOptions
clusterServer *ClusterServer
dashboardClientFunc func() utils.RayDashboardClientInterface
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) utils.RayDashboardClientInterface
log logr.Logger
}

Expand All @@ -49,11 +50,8 @@ func (s *RayJobSubmissionServiceServer) SubmitRayJob(ctx context.Context, req *a
if err != nil {
return nil, err
}
rayDashboardClient := s.dashboardClientFunc()
rayDashboardClient := s.dashboardClientFunc(nil, *url)
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
return nil, err
}
request := &utils.RayJobRequest{Entrypoint: req.Jobsubmission.Entrypoint}
if req.Jobsubmission.SubmissionId != "" {
request.SubmissionId = req.Jobsubmission.SubmissionId
Expand Down Expand Up @@ -89,7 +87,7 @@ func (s *RayJobSubmissionServiceServer) SubmitRayJob(ctx context.Context, req *a
}
}

sid, err := rayDashboardClient.SubmitJobReq(ctx, request, nil)
sid, err := rayDashboardClient.SubmitJobReq(ctx, request)
if err != nil {
return nil, err
}
Expand All @@ -104,11 +102,7 @@ func (s *RayJobSubmissionServiceServer) GetJobDetails(ctx context.Context, req *
if err != nil {
return nil, err
}
rayDashboardClient := s.dashboardClientFunc()
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
return nil, err
}
rayDashboardClient := s.dashboardClientFunc(nil, *url)
nodeInfo, err := rayDashboardClient.GetJobInfo(ctx, req.Submissionid)
if err != nil {
return nil, err
Expand All @@ -127,11 +121,7 @@ func (s *RayJobSubmissionServiceServer) GetJobLog(ctx context.Context, req *api.
if err != nil {
return nil, err
}
rayDashboardClient := s.dashboardClientFunc()
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
return nil, err
}
rayDashboardClient := s.dashboardClientFunc(nil, *url)
jlog, err := rayDashboardClient.GetJobLog(ctx, req.Submissionid)
if err != nil {
return nil, err
Expand All @@ -150,11 +140,7 @@ func (s *RayJobSubmissionServiceServer) ListJobDetails(ctx context.Context, req
if err != nil {
return nil, err
}
rayDashboardClient := s.dashboardClientFunc()
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
return nil, err
}
rayDashboardClient := s.dashboardClientFunc(nil, *url)
nodesInfo, err := rayDashboardClient.ListJobs(ctx)
if err != nil {
return nil, err
Expand All @@ -174,11 +160,7 @@ func (s *RayJobSubmissionServiceServer) StopRayJob(ctx context.Context, req *api
if err != nil {
return nil, err
}
rayDashboardClient := s.dashboardClientFunc()
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
return nil, err
}
rayDashboardClient := s.dashboardClientFunc(nil, *url)
err = rayDashboardClient.StopJob(ctx, req.Submissionid)
if err != nil {
return nil, err
Expand All @@ -194,11 +176,7 @@ func (s *RayJobSubmissionServiceServer) DeleteRayJob(ctx context.Context, req *a
if err != nil {
return nil, err
}
rayDashboardClient := s.dashboardClientFunc()
// TODO: support proxy subresources in kuberay-apiserver
if err := rayDashboardClient.InitClient(ctx, *url, nil); err != nil {
return nil, err
}
rayDashboardClient := s.dashboardClientFunc(nil, *url)
err = rayDashboardClient.DeleteJob(ctx, req.Submissionid)
if err != nil {
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion ray-operator/apis/config/v1alpha1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/manager"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
)

Expand Down Expand Up @@ -75,7 +76,7 @@ type Configuration struct {
EnableMetrics bool `json:"enableMetrics,omitempty"`
}

func (config Configuration) GetDashboardClient(mgr manager.Manager) func() utils.RayDashboardClientInterface {
func (config Configuration) GetDashboardClient(mgr manager.Manager) func(rayCluster *rayv1.RayCluster, url string) utils.RayDashboardClientInterface {
return utils.GetRayDashboardClientFunc(mgr, config.UseKubernetesProxy)
}

Expand Down
12 changes: 3 additions & 9 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type RayJobReconciler struct {
Scheme *runtime.Scheme
Recorder record.EventRecorder

dashboardClientFunc func() utils.RayDashboardClientInterface
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) utils.RayDashboardClientInterface
options RayJobReconcilerOptions
}

Expand Down Expand Up @@ -115,10 +115,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
logger.Error(err, "Failed to get RayCluster")
}

rayDashboardClient := r.dashboardClientFunc()
if err := rayDashboardClient.InitClient(ctx, rayJobInstance.Status.DashboardURL, rayClusterInstance); err != nil {
logger.Error(err, "Failed to initialize dashboard client")
}
rayDashboardClient := r.dashboardClientFunc(rayClusterInstance, rayJobInstance.Status.DashboardURL)
if err := rayDashboardClient.StopJob(ctx, rayJobInstance.Status.JobId); err != nil {
logger.Error(err, "Failed to stop job for RayJob")
}
Expand Down Expand Up @@ -260,10 +257,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
}

// Check the current status of ray jobs
rayDashboardClient := r.dashboardClientFunc()
if err := rayDashboardClient.InitClient(ctx, rayJobInstance.Status.DashboardURL, rayClusterInstance); err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
rayDashboardClient := r.dashboardClientFunc(rayClusterInstance, rayJobInstance.Status.DashboardURL)

jobInfo, err := rayDashboardClient.GetJobInfo(ctx, rayJobInstance.Status.JobId)
if err != nil {
Expand Down
7 changes: 2 additions & 5 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type RayServiceReconciler struct {
// Cache value is map of RayCluster name to Serve application config.
ServeConfigs *lru.Cache
RayClusterDeletionTimestamps cmap.ConcurrentMap[string, time.Time]
dashboardClientFunc func() utils.RayDashboardClientInterface
dashboardClientFunc func(rayCluster *rayv1.RayCluster, url string) utils.RayDashboardClientInterface
httpProxyClientFunc func() utils.RayHttpProxyClientInterface
}

Expand Down Expand Up @@ -943,10 +943,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
return false, serveApplications, err
}

rayDashboardClient := r.dashboardClientFunc()
if err := rayDashboardClient.InitClient(ctx, clientURL, rayClusterInstance); err != nil {
return false, serveApplications, err
}
rayDashboardClient := r.dashboardClientFunc(rayClusterInstance, clientURL)

cachedServeConfigV2 := r.getServeConfigFromCache(rayServiceInstance, rayClusterInstance.Name)
isReady, serveApplications, err := getAndCheckServeStatus(ctx, rayDashboardClient)
Expand Down
4 changes: 2 additions & 2 deletions ray-operator/controllers/ray/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ var (

type TestClientProvider struct{}

func (testProvider TestClientProvider) GetDashboardClient(_ manager.Manager) func() utils.RayDashboardClientInterface {
return func() utils.RayDashboardClientInterface {
func (testProvider TestClientProvider) GetDashboardClient(_ manager.Manager) func(rayCluster *rayv1.RayCluster, url string) utils.RayDashboardClientInterface {
return func(_ *rayv1.RayCluster, _ string) utils.RayDashboardClientInterface {
return fakeRayDashboardClient
}
}
Expand Down
75 changes: 4 additions & 71 deletions ray-operator/controllers/ray/utils/dashboard_httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@ import (
"fmt"
"io"
"net/http"
"time"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/yaml"
ctrl "sigs.k8s.io/controller-runtime"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
)
Expand All @@ -25,67 +22,24 @@ var (
)

type RayDashboardClientInterface interface {
InitClient(ctx context.Context, url string, rayCluster *rayv1.RayCluster) error
UpdateDeployments(ctx context.Context, configJson []byte) error
// V2/multi-app Rest API
GetServeDetails(ctx context.Context) (*ServeDetails, error)
GetMultiApplicationStatus(context.Context) (map[string]*ServeApplicationStatus, error)
GetJobInfo(ctx context.Context, jobId string) (*RayJobInfo, error)
ListJobs(ctx context.Context) (*[]RayJobInfo, error)
SubmitJob(ctx context.Context, rayJob *rayv1.RayJob) (string, error)
SubmitJobReq(ctx context.Context, request *RayJobRequest, name *string) (string, error)
SubmitJobReq(ctx context.Context, request *RayJobRequest) (string, error)
GetJobLog(ctx context.Context, jobName string) (*string, error)
StopJob(ctx context.Context, jobName string) error
DeleteJob(ctx context.Context, jobName string) error
}

type BaseDashboardClient struct {
type RayDashboardClient struct {
client *http.Client
dashboardURL string
}

func GetRayDashboardClientFunc(mgr ctrl.Manager, useKubernetesProxy bool) func() RayDashboardClientInterface {
return func() RayDashboardClientInterface {
return &RayDashboardClient{
mgr: mgr,
useKubernetesProxy: useKubernetesProxy,
}
}
}

type RayDashboardClient struct {
mgr ctrl.Manager
BaseDashboardClient
useKubernetesProxy bool
}

func (r *RayDashboardClient) InitClient(ctx context.Context, url string, rayCluster *rayv1.RayCluster) error {
log := ctrl.LoggerFrom(ctx)

if r.useKubernetesProxy {
var err error
headSvcName := rayCluster.Status.Head.ServiceName
if headSvcName == "" {
log.Info("RayCluster is missing .status.head.serviceName, calling GenerateHeadServiceName instead...", "RayCluster name", rayCluster.Name, "namespace", rayCluster.Namespace)
headSvcName, err = GenerateHeadServiceName(RayClusterCRD, rayCluster.Spec, rayCluster.Name)
if err != nil {
return err
}
}

r.client = r.mgr.GetHTTPClient()
r.dashboardURL = fmt.Sprintf("%s/api/v1/namespaces/%s/services/%s:dashboard/proxy", r.mgr.GetConfig().Host, rayCluster.Namespace, headSvcName)
return nil
}

r.client = &http.Client{
Timeout: 2 * time.Second,
}

r.dashboardURL = "http://" + url
return nil
}

// UpdateDeployments update the deployments in the Ray cluster.
func (r *RayDashboardClient) UpdateDeployments(ctx context.Context, configJson []byte) error {
var req *http.Request
Expand Down Expand Up @@ -276,18 +230,14 @@ func (r *RayDashboardClient) SubmitJob(ctx context.Context, rayJob *rayv1.RayJob
if err != nil {
return "", err
}
return r.SubmitJobReq(ctx, request, &rayJob.Name)
return r.SubmitJobReq(ctx, request)
}

func (r *RayDashboardClient) SubmitJobReq(ctx context.Context, request *RayJobRequest, name *string) (jobId string, err error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you open another PR to remove the logs instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do when this is merged.

log := ctrl.LoggerFrom(ctx)
func (r *RayDashboardClient) SubmitJobReq(ctx context.Context, request *RayJobRequest) (jobId string, err error) {
rayJobJson, err := json.Marshal(request)
if err != nil {
return
}
if name != nil {
log.Info("Submit a ray job", "rayJob", name, "jobInfo", string(rayJobJson))
}

req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.dashboardURL+JobPath, bytes.NewBuffer(rayJobJson))
if err != nil {
Expand Down Expand Up @@ -321,9 +271,6 @@ func (r *RayDashboardClient) SubmitJobReq(ctx context.Context, request *RayJobRe

// Get Job Log
func (r *RayDashboardClient) GetJobLog(ctx context.Context, jobName string) (*string, error) {
log := ctrl.LoggerFrom(ctx)
log.Info("Get ray job log", "rayJob", jobName)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, r.dashboardURL+JobPath+jobName+"/logs", nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -355,9 +302,6 @@ func (r *RayDashboardClient) GetJobLog(ctx context.Context, jobName string) (*st
}

func (r *RayDashboardClient) StopJob(ctx context.Context, jobName string) (err error) {
log := ctrl.LoggerFrom(ctx)
log.Info("Stop a ray job", "rayJob", jobName)

req, err := http.NewRequestWithContext(ctx, http.MethodPost, r.dashboardURL+JobPath+jobName+"/stop", nil)
if err != nil {
return err
Expand Down Expand Up @@ -394,9 +338,6 @@ func (r *RayDashboardClient) StopJob(ctx context.Context, jobName string) (err e
}

func (r *RayDashboardClient) DeleteJob(ctx context.Context, jobName string) error {
log := ctrl.LoggerFrom(ctx)
log.Info("Delete a ray job", "rayJob", jobName)

req, err := http.NewRequestWithContext(ctx, http.MethodDelete, r.dashboardURL+JobPath+jobName, nil)
if err != nil {
return err
Expand Down Expand Up @@ -434,11 +375,3 @@ func ConvertRayJobToReq(rayJob *rayv1.RayJob) (*RayJobRequest, error) {
}
return req, nil
}

func UnmarshalRuntimeEnvYAML(runtimeEnvYAML string) (RuntimeEnvType, error) {
var runtimeEnv RuntimeEnvType
if err := yaml.Unmarshal([]byte(runtimeEnvYAML), &runtimeEnv); err != nil {
return nil, fmt.Errorf("failed to unmarshal RuntimeEnvYAML: %v: %w", runtimeEnvYAML, err)
}
return runtimeEnv, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ var _ = Describe("RayFrameworkGenerator", func() {
}

rayDashboardClient = &RayDashboardClient{}
err := rayDashboardClient.InitClient(context.Background(), "127.0.0.1:8090", nil)
Expect(err).ToNot(HaveOccurred())
rayDashboardClient.dashboardURL = "http://127.0.0.1:8090"
rayDashboardClient.client = &http.Client{}
})

It("Test ConvertRayJobToReq", func() {
Expand Down
12 changes: 2 additions & 10 deletions ray-operator/controllers/ray/utils/fake_serve_httpclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package utils
import (
"context"
"fmt"
"net/http"
"sync/atomic"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
Expand All @@ -12,18 +11,11 @@ import (
type FakeRayDashboardClient struct {
multiAppStatuses map[string]*ServeApplicationStatus
GetJobInfoMock atomic.Pointer[func(context.Context, string) (*RayJobInfo, error)]
BaseDashboardClient
serveDetails ServeDetails
serveDetails ServeDetails
}

var _ RayDashboardClientInterface = (*FakeRayDashboardClient)(nil)

func (r *FakeRayDashboardClient) InitClient(_ context.Context, url string, _ *rayv1.RayCluster) error {
r.client = &http.Client{}
r.dashboardURL = "http://" + url
return nil
}

func (r *FakeRayDashboardClient) UpdateDeployments(_ context.Context, _ []byte) error {
fmt.Print("UpdateDeployments fake succeeds.")
return nil
Expand Down Expand Up @@ -63,7 +55,7 @@ func (r *FakeRayDashboardClient) SubmitJob(_ context.Context, _ *rayv1.RayJob) (
return "", nil
}

func (r *FakeRayDashboardClient) SubmitJobReq(_ context.Context, _ *RayJobRequest, _ *string) (string, error) {
func (r *FakeRayDashboardClient) SubmitJobReq(_ context.Context, _ *RayJobRequest) (string, error) {
return "", nil
}

Expand Down
Loading
Loading