From a6680c13c728b9dd6765410ab4196edee8f0b1e6 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Mon, 22 Sep 2025 13:58:30 +0800 Subject: [PATCH 1/2] [draft pr][RayJob] Use timeout to prevent RayCluster leak Signed-off-by: Future-Outlier --- ray-operator/apis/ray/v1/rayjob_types.go | 1 + .../controllers/ray/rayjob_controller.go | 43 +++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/ray-operator/apis/ray/v1/rayjob_types.go b/ray-operator/apis/ray/v1/rayjob_types.go index 8f78eceed07..7beff192bee 100644 --- a/ray-operator/apis/ray/v1/rayjob_types.go +++ b/ray-operator/apis/ray/v1/rayjob_types.go @@ -76,6 +76,7 @@ const ( AppFailed JobFailedReason = "AppFailed" JobDeploymentStatusTransitionGracePeriodExceeded JobFailedReason = "JobDeploymentStatusTransitionGracePeriodExceeded" ValidationFailed JobFailedReason = "ValidationFailed" + JobRunningTimeoutExceeded JobFailedReason = "JobRunningTimeoutExceeded" ) type JobSubmissionMode string diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index cae16cd0bc8..a94320b7279 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -279,6 +279,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err } + // if shouldUpdate := checkJobRunningTimeoutAndUpdateStatusIfNeeded(ctx, rayJobInstance); shouldUpdate { + // break + // } + // If the JobStatus is in a terminal status, such as SUCCEEDED, FAILED, or STOPPED, it is impossible for the Ray job // to transition to any other. Additionally, RayJob does not currently support retries. Hence, we can mark the RayJob // as "Complete" or "Failed" to avoid unnecessary reconciliation. @@ -463,6 +467,45 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil } +// func checkJobRunningTimeoutAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob, jobInfo *rayv1.RayJobStatusInfo) bool { +// logger := ctrl.LoggerFrom(ctx) + +// if jobInfo.JobStatus != rayv1.JobStatusRunning { +// return false +// } + +// timeoutSeconds := int32(30) +// if rayJob.Spec.JobRunningTimeoutSeconds != nil { +// timeoutSeconds = *rayJob.Spec.JobRunningTimeoutSeconds +// } + +// var jobStartTime time.Time +// if jobInfo.StartTime != 0 { +// jobStartTime = time.UnixMilli(utils.SafeUint64ToInt64(jobInfo.StartTime)) +// } else if rayJob.Status.RayJobStatusInfo.StartTime != nil { +// jobStartTime = rayJob.Status.RayJobStatusInfo.StartTime.Time +// } else { +// jobStartTime = rayJob.Status.StartTime.Time +// } + +// timeoutTime := jobStartTime.Add(time.Duration(timeoutSeconds) * time.Second) + +// if time.Now().After(timeoutTime) { +// logger.Info("JobRunningTimeout exceeded. Forcing JobDeploymentStatus to Failed.", +// "JobStartTime", jobStartTime, +// "TimeoutSeconds", timeoutSeconds, +// "JobStatus", jobInfo.JobStatus) + +// rayJob.Status.JobDeploymentStatus = rayv1.JobDeploymentStatusFailed +// rayJob.Status.Reason = rayv1.JobRunningTimeoutExceeded +// rayJob.Status.Message = fmt.Sprintf("JobRunningTimeout exceeded. JobStartTime: %v, TimeoutSeconds: %d, JobStatus: %s", +// jobStartTime, timeoutSeconds, jobInfo.JobStatus) +// return true +// } + +// return false +// } + func validateRayJob(ctx context.Context, rayJobInstance *rayv1.RayJob) (utils.K8sEventType, error) { logger := ctrl.LoggerFrom(ctx) validationRules := []struct { From 8a4cd069c5bc9cf5fa96b3a4e19a59980dfe61b8 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Mon, 22 Sep 2025 14:07:11 +0800 Subject: [PATCH 2/2] update Signed-off-by: Future-Outlier --- ray-operator/controllers/ray/rayjob_controller.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index a94320b7279..2e778d6cb23 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -258,6 +258,10 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) } } + // if shouldUpdate := checkSubmitterFinishedTimeoutAndUpdateStatusIfNeeded(ctx, rayJobInstance); shouldUpdate { + // break + // } + // Check the current status of ray jobs rayDashboardClient, err := r.dashboardClientFunc(rayClusterInstance, rayJobInstance.Status.DashboardURL) if err != nil { @@ -279,10 +283,6 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err } - // if shouldUpdate := checkJobRunningTimeoutAndUpdateStatusIfNeeded(ctx, rayJobInstance); shouldUpdate { - // break - // } - // If the JobStatus is in a terminal status, such as SUCCEEDED, FAILED, or STOPPED, it is impossible for the Ray job // to transition to any other. Additionally, RayJob does not currently support retries. Hence, we can mark the RayJob // as "Complete" or "Failed" to avoid unnecessary reconciliation. @@ -467,7 +467,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, nil } -// func checkJobRunningTimeoutAndUpdateStatusIfNeeded(ctx context.Context, rayJob *rayv1.RayJob, jobInfo *rayv1.RayJobStatusInfo) bool { +// func updateStatusIfNeededAfterSubmitterFinished(ctx context.Context, rayJob *rayv1.RayJob, jobInfo *rayv1.RayJobStatusInfo) bool { // logger := ctrl.LoggerFrom(ctx) // if jobInfo.JobStatus != rayv1.JobStatusRunning {