Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
with candidate_ids as (
-- canceling jobs
-- canceling jobs that were never claimed by an agent or stopped heartbeating
select j.system_job_id
from jobs j
left join active_jobs aj on aj.system_job_id = j.system_job_id
where j.job_status_id = 7
and (aj.system_job_id is null or aj.last_heartbeat is null)

union

-- stale active jobs (drive from active_jobs)
-- stale jobs still in active_jobs table (worker stopped heartbeating)
select aj.system_job_id
from active_jobs aj
where aj.last_heartbeat > 0
Expand Down Expand Up @@ -34,5 +36,4 @@ select
from picked p
join commands cm on cm.system_command_id = p.job_command_id
join clusters cl on cl.system_cluster_id = p.job_cluster_id
order by p.system_job_id;

order by p.system_job_id;
86 changes: 70 additions & 16 deletions internal/pkg/object/command/sparkeks/sparkeks.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/kubeflow/spark-operator/v2/api/v1beta2"
sparkClientSet "github.com/kubeflow/spark-operator/v2/pkg/client/clientset/versioned"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
Expand Down Expand Up @@ -190,7 +191,59 @@ func (s *commandContext) Execute(ctx context.Context, r *plugin.Runtime, j *job.
}

func (s *commandContext) Cleanup(ctx context.Context, jobID string, c *cluster.Cluster) error {
// TODO: Implement cleanup if needed

// get app name and namespace from job id and command context
appName := fmt.Sprintf("%s-%s", applicationPrefix, jobID)
namespace := s.KubeNamespace
if namespace == "" {
namespace = defaultNamespace
}

// Parse cluster context (region/role arn, etc).
clusterContext := &clusterContext{}
if c != nil && c.Context != nil {
if err := c.Context.Unmarshal(clusterContext); err != nil {
return fmt.Errorf("failed to unmarshal cluster context: %w", err)
}
}

// build execution context
execCtx := &executionContext{
job: &job.Job{},
cluster: c,
commandContext: s,
clusterContext: clusterContext,
appName: appName,
}
execCtx.job.ID = jobID

// create spark clients
if err := createSparkClients(ctx, execCtx); err != nil {
return err
}

// delete spark application
if err := deleteSparkApplication(ctx, execCtx.sparkClient, namespace, appName); err != nil {
return fmt.Errorf("failed to cleanup SparkApplication %s/%s: %w", namespace, appName, err)
}

return nil
}

func deleteSparkApplication(ctx context.Context, sparkClient *sparkClientSet.Clientset, namespace, name string) error {
sparkApps := sparkClient.SparkoperatorV1beta2().SparkApplications(namespace)
_, err := sparkApps.Get(ctx, name, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}

if err := sparkApps.Delete(ctx, name, metav1.DeleteOptions{}); err != nil && !apierrors.IsNotFound(err) {
return err
}

return nil
}

Expand Down Expand Up @@ -307,17 +360,7 @@ func (e *executionContext) cleanupSparkApp(ctx context.Context) error {
return nil
}

name, namespace := e.submittedApp.Name, e.submittedApp.Namespace
_, getErr := e.sparkClient.SparkoperatorV1beta2().SparkApplications(namespace).Get(ctx, name, metav1.GetOptions{})
if getErr == nil {
// Application exists, proceed with deletion
cleanupErr := e.sparkClient.SparkoperatorV1beta2().SparkApplications(namespace).Delete(ctx, name, metav1.DeleteOptions{})
if cleanupErr != nil && !strings.Contains(cleanupErr.Error(), "not found") {
return cleanupErr // Return error if deletion fails for reasons other than "not found"
}
e.runtime.Stdout.WriteString(fmt.Sprintf("Cleaned up Spark application: %s/%s\n", namespace, name))
}
return nil // "not found" is a successful cleanup state
return deleteSparkApplication(ctx, e.sparkClient, e.submittedApp.Namespace, e.submittedApp.Name)
}

// getAndStoreResults fetches the job output from S3 and stores it.
Expand Down Expand Up @@ -501,20 +544,26 @@ func createSparkClients(ctx context.Context, execCtx *executionContext) error {
// Create Spark Operator client
sparkClient, err := sparkClientSet.NewForConfig(clientConfig)
if err != nil {
execCtx.runtime.Stderr.WriteString(fmt.Sprintf("Failed to create Spark Operator client: %v\n", err))
if execCtx.runtime != nil && execCtx.runtime.Stderr != nil {
execCtx.runtime.Stderr.WriteString(fmt.Sprintf("Failed to create Spark Operator client: %v\n", err))
}
return ErrKubeConfig
}
execCtx.sparkClient = sparkClient

// Create Kubernetes client
kubeClient, err := kubernetes.NewForConfig(clientConfig)
if err != nil {
execCtx.runtime.Stderr.WriteString(fmt.Sprintf("Failed to create Kubernetes client: %v\n", err))
if execCtx.runtime != nil && execCtx.runtime.Stderr != nil {
execCtx.runtime.Stderr.WriteString(fmt.Sprintf("Failed to create Kubernetes client: %v\n", err))
}
return ErrKubeConfig
}
execCtx.kubeClient = kubeClient

execCtx.runtime.Stdout.WriteString(fmt.Sprintf("Successfully created Spark Operator and Kubernetes clients for cluster: %s\n", execCtx.cluster.Name))
if execCtx.runtime != nil && execCtx.runtime.Stdout != nil {
execCtx.runtime.Stdout.WriteString(fmt.Sprintf("Successfully created Spark Operator and Kubernetes clients for cluster: %s\n", execCtx.cluster.Name))
}
return nil
}

Expand All @@ -529,7 +578,11 @@ func updateKubeConfig(ctx context.Context, execCtx *executionContext) (string, e
}

// Create a temporary file for the kubeconfig
tmpfile, err := os.CreateTemp(execCtx.runtime.WorkingDirectory, "kubeconfig-")
baseDir := ""
if execCtx.runtime != nil {
baseDir = execCtx.runtime.WorkingDirectory
}
tmpfile, err := os.CreateTemp(baseDir, "kubeconfig-")
if err != nil {
return "", fmt.Errorf("failed to create temp file for kubeconfig: %w", err)
}
Expand Down Expand Up @@ -614,6 +667,7 @@ func applySparkOperatorConfig(execCtx *executionContext) {
sparkApp.Spec.SparkConf = make(map[string]string)
}

// Add default spark properties
sparkApp.Spec.SparkConf[sparkAppNameProperty] = execCtx.appName

if execCtx.logURI != "" {
Expand Down