diff --git a/internal/pkg/janitor/queries/stale_and_canceling_jobs_select.sql b/internal/pkg/janitor/queries/stale_and_canceling_jobs_select.sql index 36a7183..4c9ee3e 100644 --- a/internal/pkg/janitor/queries/stale_and_canceling_jobs_select.sql +++ b/internal/pkg/janitor/queries/stale_and_canceling_jobs_select.sql @@ -1,12 +1,14 @@ with candidate_ids as ( - -- canceling jobs + -- canceling jobs that were never claimed by an agent or stopped heartbeating select j.system_job_id from jobs j + left join active_jobs aj on aj.system_job_id = j.system_job_id where j.job_status_id = 7 + and (aj.system_job_id is null or aj.last_heartbeat is null) union - -- stale active jobs (drive from active_jobs) + -- stale jobs still in active_jobs table (worker stopped heartbeating) select aj.system_job_id from active_jobs aj where aj.last_heartbeat > 0 @@ -34,5 +36,4 @@ select from picked p join commands cm on cm.system_command_id = p.job_command_id join clusters cl on cl.system_cluster_id = p.job_cluster_id -order by p.system_job_id; - +order by p.system_job_id; \ No newline at end of file diff --git a/internal/pkg/object/command/sparkeks/sparkeks.go b/internal/pkg/object/command/sparkeks/sparkeks.go index 724851b..1b60f0d 100644 --- a/internal/pkg/object/command/sparkeks/sparkeks.go +++ b/internal/pkg/object/command/sparkeks/sparkeks.go @@ -22,6 +22,7 @@ import ( "github.com/kubeflow/spark-operator/v2/api/v1beta2" sparkClientSet "github.com/kubeflow/spark-operator/v2/pkg/client/clientset/versioned" corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" @@ -190,7 +191,59 @@ func (s *commandContext) Execute(ctx context.Context, r *plugin.Runtime, j *job. } func (s *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error { - // TODO: Implement cleanup if needed + + // get app name and namespace from job id and command context + appName := fmt.Sprintf("%s-%s", applicationPrefix, jobID) + namespace := s.KubeNamespace + if namespace == "" { + namespace = defaultNamespace + } + + // Parse cluster context (region/role arn, etc). + clusterContext := &clusterContext{} + if c != nil && c.Context != nil { + if err := c.Context.Unmarshal(clusterContext); err != nil { + return fmt.Errorf("failed to unmarshal cluster context: %w", err) + } + } + + // build execution context + execCtx := &executionContext{ + job: &job.Job{}, + cluster: c, + commandContext: s, + clusterContext: clusterContext, + appName: appName, + } + execCtx.job.ID = jobID + + // create spark clients + if err := createSparkClients(ctx, execCtx); err != nil { + return err + } + + // delete spark application + if err := deleteSparkApplication(ctx, execCtx.sparkClient, namespace, appName); err != nil { + return fmt.Errorf("failed to cleanup SparkApplication %s/%s: %w", namespace, appName, err) + } + + return nil +} + +func deleteSparkApplication(ctx context.Context, sparkClient *sparkClientSet.Clientset, namespace, name string) error { + sparkApps := sparkClient.SparkoperatorV1beta2().SparkApplications(namespace) + _, err := sparkApps.Get(ctx, name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + if err := sparkApps.Delete(ctx, name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) { + return err + } + return nil } @@ -307,17 +360,7 @@ func (e *executionContext) cleanupSparkApp(ctx context.Context) error { return nil } - name, namespace := e.submittedApp.Name, e.submittedApp.Namespace - _, getErr := e.sparkClient.SparkoperatorV1beta2().SparkApplications(namespace).Get(ctx, name, metav1.GetOptions{}) - if getErr == nil { - // Application exists, proceed with deletion - cleanupErr := e.sparkClient.SparkoperatorV1beta2().SparkApplications(namespace).Delete(ctx, name, metav1.DeleteOptions{}) - if cleanupErr != nil && !strings.Contains(cleanupErr.Error(), "not found") { - return cleanupErr // Return error if deletion fails for reasons other than "not found" - } - e.runtime.Stdout.WriteString(fmt.Sprintf("Cleaned up Spark application: %s/%s\n", namespace, name)) - } - return nil // "not found" is a successful cleanup state + return deleteSparkApplication(ctx, e.sparkClient, e.submittedApp.Namespace, e.submittedApp.Name) } // getAndStoreResults fetches the job output from S3 and stores it. @@ -501,7 +544,9 @@ func createSparkClients(ctx context.Context, execCtx *executionContext) error { // Create Spark Operator client sparkClient, err := sparkClientSet.NewForConfig(clientConfig) if err != nil { - execCtx.runtime.Stderr.WriteString(fmt.Sprintf("Failed to create Spark Operator client: %v\n", err)) + if execCtx.runtime != nil && execCtx.runtime.Stderr != nil { + execCtx.runtime.Stderr.WriteString(fmt.Sprintf("Failed to create Spark Operator client: %v\n", err)) + } return ErrKubeConfig } execCtx.sparkClient = sparkClient @@ -509,12 +554,16 @@ func createSparkClients(ctx context.Context, execCtx *executionContext) error { // Create Kubernetes client kubeClient, err := kubernetes.NewForConfig(clientConfig) if err != nil { - execCtx.runtime.Stderr.WriteString(fmt.Sprintf("Failed to create Kubernetes client: %v\n", err)) + if execCtx.runtime != nil && execCtx.runtime.Stderr != nil { + execCtx.runtime.Stderr.WriteString(fmt.Sprintf("Failed to create Kubernetes client: %v\n", err)) + } return ErrKubeConfig } execCtx.kubeClient = kubeClient - execCtx.runtime.Stdout.WriteString(fmt.Sprintf("Successfully created Spark Operator and Kubernetes clients for cluster: %s\n", execCtx.cluster.Name)) + if execCtx.runtime != nil && execCtx.runtime.Stdout != nil { + execCtx.runtime.Stdout.WriteString(fmt.Sprintf("Successfully created Spark Operator and Kubernetes clients for cluster: %s\n", execCtx.cluster.Name)) + } return nil } @@ -529,7 +578,11 @@ func updateKubeConfig(ctx context.Context, execCtx *executionContext) (string, e } // Create a temporary file for the kubeconfig - tmpfile, err := os.CreateTemp(execCtx.runtime.WorkingDirectory, "kubeconfig-") + baseDir := "" + if execCtx.runtime != nil { + baseDir = execCtx.runtime.WorkingDirectory + } + tmpfile, err := os.CreateTemp(baseDir, "kubeconfig-") if err != nil { return "", fmt.Errorf("failed to create temp file for kubeconfig: %w", err) } @@ -614,6 +667,7 @@ func applySparkOperatorConfig(execCtx *executionContext) { sparkApp.Spec.SparkConf = make(map[string]string) } + // Add default spark properties sparkApp.Spec.SparkConf[sparkAppNameProperty] = execCtx.appName if execCtx.logURI != "" {