Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 65 additions & 28 deletions core/scheduler/service/job_sla_predictor_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/goto/optimus/config"
"github.com/goto/optimus/core/scheduler"
"github.com/goto/optimus/core/tenant"
"github.com/goto/optimus/internal/lib/cron"
)

type SLABreachCause string
Expand Down Expand Up @@ -49,6 +50,10 @@ type SLAPredictorRepository interface {
GetPredictedSLAJobNamesWithinTimeRange(ctx context.Context, from, to time.Time) ([]scheduler.JobName, error)
}

type AirflowStatusGetter interface {
GetJobRuns(ctx context.Context, tnnt tenant.Tenant, jobQuery *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error)
}

type JobState struct {
JobSLAState
JobName scheduler.JobName
Expand All @@ -64,18 +69,19 @@ type JobSLAState struct {
}

type JobSLAPredictorService struct {
l log.Logger
config config.PotentialSLABreachConfig
repo SLAPredictorRepository
jobDetailsGetter JobDetailsGetter
jobLineageFetcher JobLineageFetcher
durationEstimator DurationEstimator
tenantGetter TenantGetter
l log.Logger
config config.PotentialSLABreachConfig
repo SLAPredictorRepository
jobDetailsGetter JobDetailsGetter
jobLineageFetcher JobLineageFetcher
durationEstimator DurationEstimator
tenantGetter TenantGetter
airflowStatusGetter AirflowStatusGetter
// alerting purpose
potentialSLANotifier PotentialSLANotifier
}

func NewJobSLAPredictorService(l log.Logger, config config.PotentialSLABreachConfig, slaPredictorRepo SLAPredictorRepository, jobLineageFetcher JobLineageFetcher, durationEstimator DurationEstimator, jobDetailsGetter JobDetailsGetter, potentialSLANotifier PotentialSLANotifier, tenantGetter TenantGetter) *JobSLAPredictorService {
func NewJobSLAPredictorService(l log.Logger, config config.PotentialSLABreachConfig, slaPredictorRepo SLAPredictorRepository, jobLineageFetcher JobLineageFetcher, durationEstimator DurationEstimator, jobDetailsGetter JobDetailsGetter, potentialSLANotifier PotentialSLANotifier, tenantGetter TenantGetter, airflowStatusGetter AirflowStatusGetter) *JobSLAPredictorService {
return &JobSLAPredictorService{
l: l,
config: config,
Expand All @@ -84,6 +90,7 @@ func NewJobSLAPredictorService(l log.Logger, config config.PotentialSLABreachCon
durationEstimator: durationEstimator,
jobDetailsGetter: jobDetailsGetter,
tenantGetter: tenantGetter,
airflowStatusGetter: airflowStatusGetter,
potentialSLANotifier: potentialSLANotifier,
}
}
Expand Down Expand Up @@ -160,7 +167,7 @@ func (s *JobSLAPredictorService) IdentifySLABreaches(ctx context.Context, projec
if !ok || targetedSLA == nil {
continue
}
breachesCauses, fullBreachesCauses := s.IdentifySLABreach(jobWithLineage, jobDurations, targetedSLA, skipJobNames, damperCoeff, reqConfig.ReferenceTime)
breachesCauses, fullBreachesCauses := s.IdentifySLABreach(ctx, jobWithLineage, jobDurations, targetedSLA, skipJobNames, damperCoeff, reqConfig.ReferenceTime)
// populate jobBreachCauses
if len(breachesCauses) > 0 {
jobBreachCauses[jobSchedule.JobName] = breachesCauses
Expand Down Expand Up @@ -202,7 +209,7 @@ func (s *JobSLAPredictorService) IdentifySLABreaches(ctx context.Context, projec
return jobBreachCauses, nil
}

func (s *JobSLAPredictorService) IdentifySLABreach(jobTarget *scheduler.JobLineageSummary, jobDurations map[scheduler.JobName]*time.Duration, targetedSLA *time.Time, skipJobNames map[scheduler.JobName]bool, damperCoeff float64, referenceTime time.Time) (map[scheduler.JobName]*JobState, map[scheduler.JobName][]*JobState) {
func (s *JobSLAPredictorService) IdentifySLABreach(ctx context.Context, jobTarget *scheduler.JobLineageSummary, jobDurations map[scheduler.JobName]*time.Duration, targetedSLA *time.Time, skipJobNames map[scheduler.JobName]bool, damperCoeff float64, referenceTime time.Time) (map[scheduler.JobName]*JobState, map[scheduler.JobName][]*JobState) {
// calculate inferred SLAs for each job based on their downstream critical jobs and estimated durations
// S(u|j) = S(j) - D(u)
jobSLAStatesByJobTarget := s.CalculateInferredSLAs(jobTarget, jobDurations, targetedSLA, damperCoeff)
Expand All @@ -213,7 +220,7 @@ func (s *JobSLAPredictorService) IdentifySLABreach(jobTarget *scheduler.JobLinea
// identify jobs that might breach their SLAs based on current time and inferred SLAs
// T(now)>= S(u|j) and the job u has not completed yet
// T(now)>= S(u|j) - D(u) and the job u has not started yet
rootCauses, allUpstreamStates := s.identifySLABreachRootCauses(jobTarget, jobSLAStates, skipJobNames, referenceTime)
rootCauses, allUpstreamStates := s.identifySLABreachRootCauses(ctx, jobTarget, jobSLAStates, skipJobNames, referenceTime)

// populate breachesCauses
breachesCauses := make(map[scheduler.JobName]*JobState)
Expand Down Expand Up @@ -432,7 +439,7 @@ func (s *JobSLAPredictorService) CalculateInferredSLAs(jobTarget *scheduler.JobL
// - Given current time in UTC T(now), T(now)>= S(u|j) (the inferred SLA for u induced by j has passed) and the upstream job u has not completed yet. Or,
// - Given current time in UTC T(now), T(now)>= S(u|j) - D(u) (the inferred SLA for u induced by j minus the average duration of u has passed) and the upstream job u has not started yet.
// return the job that might breach its SLA
func (s *JobSLAPredictorService) identifySLABreachRootCauses(jobTarget *scheduler.JobLineageSummary, jobSLAStates map[scheduler.JobName]*JobSLAState, skipJobNames map[scheduler.JobName]bool, referenceTime time.Time) ([][]*JobState, [][]*JobState) {
func (s *JobSLAPredictorService) identifySLABreachRootCauses(ctx context.Context, jobTarget *scheduler.JobLineageSummary, jobSLAStates map[scheduler.JobName]*JobSLAState, skipJobNames map[scheduler.JobName]bool, referenceTime time.Time) ([][]*JobState, [][]*JobState) {
jobBreachStates := make(map[scheduler.JobName]*JobState)
allUpstreamStates := make([][]*JobState, 0)

Expand Down Expand Up @@ -494,27 +501,35 @@ func (s *JobSLAPredictorService) identifySLABreachRootCauses(jobTarget *schedule
var state *JobState
// condition 1: T(now)>= S(u|j) and the job u has not completed yet
if (referenceTime.After(inferredSLA) && jobRun != nil && jobRun.JobEndTime == nil) || (jobRun != nil && jobRun.JobEndTime != nil && jobRun.JobEndTime.After(inferredSLA)) {
// add to jobStatePaths
state = &JobState{
JobSLAState: *jobSLAStates[job.JobName],
JobName: job.JobName,
JobRun: *jobRun,
Tenant: job.Tenant,
RelativeLevel: jobWithState.level,
Status: SLABreachCauseRunningLate,
if !s.isJobRunSuccess(ctx, job, jobRun) {
// add to jobStatePaths
state = &JobState{
JobSLAState: *jobSLAStates[job.JobName],
JobName: job.JobName,
JobRun: *jobRun,
Tenant: job.Tenant,
RelativeLevel: jobWithState.level,
Status: SLABreachCauseRunningLate,
}
} else {
s.l.Info("skipping job for SLA breach as the job run has completed successfully", "job", job.JobName)
}
}

// condition 2: T(now)>= S(u|j) - D(u) and the job u has not started yet
if referenceTime.After(inferredSLA.Add(-estimatedDuration)) && (jobRun != nil && jobRun.TaskStartTime == nil) {
// add to jobStatePaths
state = &JobState{
JobSLAState: *jobSLAStates[job.JobName],
JobName: job.JobName,
JobRun: *jobRun,
Tenant: job.Tenant,
RelativeLevel: jobWithState.level,
Status: SLABreachCauseNotStarted,
if !s.isJobRunSuccess(ctx, job, jobRun) {
// add to jobStatePaths
state = &JobState{
JobSLAState: *jobSLAStates[job.JobName],
JobName: job.JobName,
JobRun: *jobRun,
Tenant: job.Tenant,
RelativeLevel: jobWithState.level,
Status: SLABreachCauseNotStarted,
}
} else {
s.l.Info("skipping job for SLA breach as the job run has completed successfully", "job", job.JobName)
}
}

Expand Down Expand Up @@ -564,6 +579,28 @@ func (s *JobSLAPredictorService) identifySLABreachRootCauses(jobTarget *schedule
return rootCauses, compactedAllUpstreamStates
}

func (s *JobSLAPredictorService) isJobRunSuccess(ctx context.Context, job *scheduler.JobLineageSummary, jobRun *scheduler.JobRunSummary) bool {
if jobRun == nil {
return false
}
jobCron, err := cron.ParseCronSchedule(job.ScheduleInterval)
if err != nil {
s.l.Error("failed to parse cron schedule for job, skipping SLA breach check for running late condition", "job", job.JobName, "error", err)
return false
}
runStatuses, err := s.airflowStatusGetter.GetJobRuns(ctx, job.Tenant, &scheduler.JobRunsCriteria{
Name: job.JobName.String(),
StartDate: jobRun.ScheduledAt,
EndDate: jobRun.ScheduledAt,
}, jobCron)
if err != nil || len(runStatuses) == 0 {
s.l.Error("failed to get job run status from Airflow", "job", jobRun.JobName, "scheduled_at", jobRun.ScheduledAt, "error", err)
return false
}
jobRunStatus := runStatuses[0] // there should be only one run status for the given scheduled at
return jobRunStatus.State == scheduler.StateSuccess
}

// populateJobSLAStates populates the jobSLAStatesByJobName map with the estimated durations and inferred SLAs for each job.
func (*JobSLAPredictorService) populateJobSLAStates(jobDurations map[scheduler.JobName]*time.Duration, jobSLAsByJobName map[scheduler.JobName]*time.Time) map[scheduler.JobName]*JobSLAState {
jobSLAStatesByJobName := make(map[scheduler.JobName]*JobSLAState)
Expand Down
Loading
Loading