Skip to content

Commit

Permalink
Merge pull request #366 from liyinan926/ssa-fix
Browse files Browse the repository at this point in the history
Fixed the controller logic for ScheduledSparkApplications
  • Loading branch information
liyinan926 authored Jan 16, 2019
2 parents 8a84d6f + b867cc6 commit 97d4b14
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 214 deletions.
1 change: 0 additions & 1 deletion manifest/spark-operator-with-metrics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ spec:
- name: sparkoperator
image: gcr.io/spark-operator/spark-operator:v2.4.0-v1alpha1-latest
imagePullPolicy: Always
command: ["/usr/bin/spark-operator"]
ports:
- containerPort: 10254
args:
Expand Down
3 changes: 1 addition & 2 deletions manifest/spark-operator-with-webhook.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ spec:
- name: webhook-certs
mountPath: /etc/webhook-certs
ports:
- containerPort: 8080
command: ["/usr/bin/spark-operator"]
- containerPort: 8080
args:
- -logtostderr
- -enable-webhook=true
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ const (
AffinityAnnotation = LabelAnnotationPrefix + "affinity"
// SparkAppNameLabel is the name of the label for the SparkApplication object name.
SparkAppNameLabel = LabelAnnotationPrefix + "app-name"
// ScheduledSparkAppNameLabel is the name of the label for the ScheduledSparkApplication object name.
ScheduledSparkAppNameLabel = LabelAnnotationPrefix + "scheduled-app-name"
// LaunchedBySparkOperatorLabel is a label on Spark pods launched through the Spark Operator.
LaunchedBySparkOperatorLabel = LabelAnnotationPrefix + "launched-by-spark-operator"

// SparkApplicationSelectorLabel is the AppID set by the spark-distribution on the driver/executors Pods.
SparkApplicationSelectorLabel = "spark-app-selector"
// SparkRoleLabel is the driver/executor label set by the operator/spark-distribution on the driver/executors Pods.
SparkRoleLabel = "spark-role"

// TolerationsAnnotationPrefix is the prefix of annotations that specify a Toleration.
TolerationsAnnotationPrefix = LabelAnnotationPrefix + "tolerations."
)
Expand Down
175 changes: 78 additions & 97 deletions pkg/controller/scheduledsparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package scheduledsparkapplication
import (
"fmt"
"reflect"
"sort"
"time"

"github.com/golang/glog"
"github.com/robfig/cron"

apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -41,6 +42,7 @@ import (
crdscheme "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/clientset/versioned/scheme"
crdinformers "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/informers/externalversions"
crdlisters "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/listers/sparkoperator.k8s.io/v1alpha1"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/config"
)

var (
Expand Down Expand Up @@ -176,38 +178,26 @@ func (c *Controller) syncScheduledSparkApplication(key string) error {
status.NextRun = metav1.NewTime(nextRunTime)
}
if nextRunTime.Before(now) {
// The next run is due. Check if this is the first run of the application.
if status.LastRunName == "" {
// This is the first run of the application.
if err = c.startNextRun(app, status, schedule); err != nil {
return err
}
} else {
// Check if the condition for starting the next run is satisfied.
ok, err := c.shouldStartNextRun(app)
if err != nil {
return err
}
if ok {
// Start the next run if the condition is satisfied.
if err = c.startNextRun(app, status, schedule); err != nil {
return err
}
} else {
// Otherwise, check and update past runs.
if err = c.checkAndUpdatePastRuns(app, status); err != nil {
return err
}
}
// Check if the condition for starting the next run is satisfied.
ok, err := c.shouldStartNextRun(app)
if err != nil {
return err
}
} else {
// The next run is not due yet, check and update past runs.
if status.LastRunName != "" {
if err = c.checkAndUpdatePastRuns(app, status); err != nil {
if ok {
glog.Infof("Next run of ScheduledSparkApplication %s/%s is due, creating a new SparkApplication instance", app.Namespace, app.Name)
name, err := c.startNextRun(app, now)
if err != nil {
return err
}
status.LastRun = metav1.NewTime(now)
status.NextRun = metav1.NewTime(schedule.Next(status.LastRun.Time))
status.LastRunName = name
}
}

if err = c.checkAndUpdatePastRuns(app, status); err != nil {
return err
}
}

return c.updateScheduledSparkApplicationStatus(app, status)
Expand Down Expand Up @@ -257,7 +247,11 @@ func (c *Controller) createSparkApplication(
Name: scheduledApp.Name,
UID: scheduledApp.UID,
})
app.ObjectMeta.SetLabels(scheduledApp.GetLabels())
app.ObjectMeta.Labels = make(map[string]string)
for key, value := range scheduledApp.Labels {
app.ObjectMeta.Labels[key] = value
}
app.ObjectMeta.Labels[config.ScheduledSparkAppNameLabel] = scheduledApp.Name
_, err := c.crdClient.SparkoperatorV1alpha1().SparkApplications(scheduledApp.Namespace).Create(app)
if err != nil {
return "", err
Expand All @@ -266,69 +260,52 @@ func (c *Controller) createSparkApplication(
}

func (c *Controller) shouldStartNextRun(app *v1alpha1.ScheduledSparkApplication) (bool, error) {
sortedApps, err := c.listSparkApplications(app)
if err != nil {
return false, err
}
if len(sortedApps) == 0 {
return true, nil
}

// The last run (most recently started) is the first one in the sorted slice.
lastRun := sortedApps[0]
switch app.Spec.ConcurrencyPolicy {
case v1alpha1.ConcurrencyAllow:
return true, nil
case v1alpha1.ConcurrencyForbid:
finished, _, err := c.hasLastRunFinished(app.Namespace, app.Status.LastRunName)
if err != nil {
return false, err
}
return finished, nil
return c.hasLastRunFinished(lastRun), nil
case v1alpha1.ConcurrencyReplace:
if err := c.killLastRunIfNotFinished(app.Namespace, app.Status.LastRunName); err != nil {
if err := c.killLastRunIfNotFinished(lastRun); err != nil {
return false, err
}
return true, nil
}
return true, nil
}

func (c *Controller) startNextRun(
app *v1alpha1.ScheduledSparkApplication,
status *v1alpha1.ScheduledSparkApplicationStatus,
schedule cron.Schedule) error {
glog.Infof("Next run of ScheduledSparkApplication %s/%s is due, creating a new SparkApplication instance", app.Namespace, app.Name)
now := c.clock.Now()
func (c *Controller) startNextRun(app *v1alpha1.ScheduledSparkApplication, now time.Time) (string, error) {
name, err := c.createSparkApplication(app, now)
if err != nil {
glog.Errorf("failed to create a SparkApplication instance for ScheduledSparkApplication %s/%s: %v", app.Namespace, app.Name, err)
return err
return "", err
}

status.LastRun = metav1.NewTime(now)
status.NextRun = metav1.NewTime(schedule.Next(status.LastRun.Time))
status.LastRunName = name
return nil
return name, nil
}

func (c *Controller) hasLastRunFinished(
namespace string,
lastRunName string) (bool, *v1alpha1.SparkApplication, error) {
app, err := c.saLister.SparkApplications(namespace).Get(lastRunName)
if err != nil {
// The SparkApplication of the last run may have been deleted already (e.g., manually by the user).
if errors.IsNotFound(err) {
return true, nil, nil
}
return false, nil, err
}

func (c *Controller) hasLastRunFinished(app *v1alpha1.SparkApplication) bool {
return app.Status.AppState.State == v1alpha1.CompletedState ||
app.Status.AppState.State == v1alpha1.FailedState, app, nil
app.Status.AppState.State == v1alpha1.FailedState
}

func (c *Controller) killLastRunIfNotFinished(namespace string, lastRunName string) error {
finished, app, err := c.hasLastRunFinished(namespace, lastRunName)
if err != nil {
return err
}
if app == nil || finished {
func (c *Controller) killLastRunIfNotFinished(app *v1alpha1.SparkApplication) error {
finished := c.hasLastRunFinished(app)
if finished {
return nil
}

// Delete the SparkApplication object of the last run.
if err = c.crdClient.SparkoperatorV1alpha1().SparkApplications(namespace).Delete(lastRunName,
if err := c.crdClient.SparkoperatorV1alpha1().SparkApplications(app.Namespace).Delete(app.Name,
metav1.NewDeleteOptions(0)); err != nil {
return err
}
Expand All @@ -339,34 +316,29 @@ func (c *Controller) killLastRunIfNotFinished(namespace string, lastRunName stri
func (c *Controller) checkAndUpdatePastRuns(
app *v1alpha1.ScheduledSparkApplication,
status *v1alpha1.ScheduledSparkApplicationStatus) error {
lastRunName := status.LastRunName
lastRunApp, err := c.saLister.SparkApplications(app.Namespace).Get(lastRunName)
sortedApps, err := c.listSparkApplications(app)
if err != nil {
// The SparkApplication of the last run may have been deleted already (e.g., manually by the user).
if errors.IsNotFound(err) {
return nil
}
return err
}

var toDelete []string
if lastRunApp.Status.AppState.State == v1alpha1.CompletedState {
limit := 1
if app.Spec.SuccessfulRunHistoryLimit != nil {
limit = int(*app.Spec.SuccessfulRunHistoryLimit)
}
status.PastSuccessfulRunNames, toDelete = recordPastRuns(status.PastSuccessfulRunNames, lastRunName, limit)
} else if lastRunApp.Status.AppState.State == v1alpha1.FailedState {
limit := 1
if app.Spec.FailedRunHistoryLimit != nil {
limit = int(*app.Spec.FailedRunHistoryLimit)
var completedRuns []string
var failedRuns []string
for _, a := range sortedApps {
if a.Status.AppState.State == v1alpha1.CompletedState {
completedRuns = append(completedRuns, a.Name)
} else if a.Status.AppState.State == v1alpha1.FailedState {
failedRuns = append(failedRuns, a.Name)
}
status.PastFailedRunNames, toDelete = recordPastRuns(status.PastFailedRunNames, lastRunName, limit)
}

var toDelete []string
status.PastSuccessfulRunNames, toDelete = bookkeepPastRuns(completedRuns, app.Spec.SuccessfulRunHistoryLimit)
for _, name := range toDelete {
c.crdClient.SparkoperatorV1alpha1().SparkApplications(app.Namespace).Delete(name,
metav1.NewDeleteOptions(0))
c.crdClient.SparkoperatorV1alpha1().SparkApplications(app.Namespace).Delete(name, metav1.NewDeleteOptions(0))
}
status.PastFailedRunNames, toDelete = bookkeepPastRuns(failedRuns, app.Spec.FailedRunHistoryLimit)
for _, name := range toDelete {
c.crdClient.SparkoperatorV1alpha1().SparkApplications(app.Namespace).Delete(name, metav1.NewDeleteOptions(0))
}

return nil
Expand Down Expand Up @@ -400,19 +372,28 @@ func (c *Controller) updateScheduledSparkApplicationStatus(
})
}

func recordPastRuns(names []string, newName string, limit int) (updatedNames []string, toDelete []string) {
if len(names) > 0 && names[0] == newName {
// The name has already been recorded.
return names, nil
func (c *Controller) listSparkApplications(app *v1alpha1.ScheduledSparkApplication) (sparkApps, error) {
set := labels.Set{config.ScheduledSparkAppNameLabel: app.Name}
apps, err := c.saLister.SparkApplications(app.Namespace).List(set.AsSelector())
if err != nil {
return nil, fmt.Errorf("failed to list SparkApplications: %v", err)
}
sortedApps := sparkApps(apps)
sort.Sort(sortedApps)
return sortedApps, nil
}

rest := names
if len(names) >= limit {
rest = names[:limit-1]
toDelete = names[limit-1:]
func bookkeepPastRuns(names []string, runLimit *int32) (toKeep []string, toDelete []string) {
limit := 1
if runLimit != nil {
limit = int(*runLimit)
}

if len(names) <= limit {
return names, nil
}
// Pre-append the name of the latest run.
updatedNames = append([]string{newName}, rest...)
toKeep = names[:limit]
toDelete = names[limit:]
return
}

Expand Down
Loading

0 comments on commit 97d4b14

Please sign in to comment.