diff --git a/core/scheduler/service/job_sla_predictor_service.go b/core/scheduler/service/job_sla_predictor_service.go index ad8eb73edb..5078fcf8f0 100644 --- a/core/scheduler/service/job_sla_predictor_service.go +++ b/core/scheduler/service/job_sla_predictor_service.go @@ -10,6 +10,7 @@ import ( "github.com/goto/optimus/config" "github.com/goto/optimus/core/scheduler" "github.com/goto/optimus/core/tenant" + "github.com/goto/optimus/internal/lib/cron" ) type SLABreachCause string @@ -49,6 +50,10 @@ type SLAPredictorRepository interface { GetPredictedSLAJobNamesWithinTimeRange(ctx context.Context, from, to time.Time) ([]scheduler.JobName, error) } +type AirflowStatusGetter interface { + GetJobRuns(ctx context.Context, tnnt tenant.Tenant, jobQuery *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error) +} + type JobState struct { JobSLAState JobName scheduler.JobName @@ -64,18 +69,19 @@ type JobSLAState struct { } type JobSLAPredictorService struct { - l log.Logger - config config.PotentialSLABreachConfig - repo SLAPredictorRepository - jobDetailsGetter JobDetailsGetter - jobLineageFetcher JobLineageFetcher - durationEstimator DurationEstimator - tenantGetter TenantGetter + l log.Logger + config config.PotentialSLABreachConfig + repo SLAPredictorRepository + jobDetailsGetter JobDetailsGetter + jobLineageFetcher JobLineageFetcher + durationEstimator DurationEstimator + tenantGetter TenantGetter + airflowStatusGetter AirflowStatusGetter // alerting purpose potentialSLANotifier PotentialSLANotifier } -func NewJobSLAPredictorService(l log.Logger, config config.PotentialSLABreachConfig, slaPredictorRepo SLAPredictorRepository, jobLineageFetcher JobLineageFetcher, durationEstimator DurationEstimator, jobDetailsGetter JobDetailsGetter, potentialSLANotifier PotentialSLANotifier, tenantGetter TenantGetter) *JobSLAPredictorService { +func NewJobSLAPredictorService(l log.Logger, config config.PotentialSLABreachConfig, slaPredictorRepo SLAPredictorRepository, jobLineageFetcher JobLineageFetcher, durationEstimator DurationEstimator, jobDetailsGetter JobDetailsGetter, potentialSLANotifier PotentialSLANotifier, tenantGetter TenantGetter, airflowStatusGetter AirflowStatusGetter) *JobSLAPredictorService { return &JobSLAPredictorService{ l: l, config: config, @@ -84,6 +90,7 @@ func NewJobSLAPredictorService(l log.Logger, config config.PotentialSLABreachCon durationEstimator: durationEstimator, jobDetailsGetter: jobDetailsGetter, tenantGetter: tenantGetter, + airflowStatusGetter: airflowStatusGetter, potentialSLANotifier: potentialSLANotifier, } } @@ -160,7 +167,7 @@ func (s *JobSLAPredictorService) IdentifySLABreaches(ctx context.Context, projec if !ok || targetedSLA == nil { continue } - breachesCauses, fullBreachesCauses := s.IdentifySLABreach(jobWithLineage, jobDurations, targetedSLA, skipJobNames, damperCoeff, reqConfig.ReferenceTime) + breachesCauses, fullBreachesCauses := s.IdentifySLABreach(ctx, jobWithLineage, jobDurations, targetedSLA, skipJobNames, damperCoeff, reqConfig.ReferenceTime) // populate jobBreachCauses if len(breachesCauses) > 0 { jobBreachCauses[jobSchedule.JobName] = breachesCauses @@ -202,7 +209,7 @@ func (s *JobSLAPredictorService) IdentifySLABreaches(ctx context.Context, projec return jobBreachCauses, nil } -func (s *JobSLAPredictorService) IdentifySLABreach(jobTarget *scheduler.JobLineageSummary, jobDurations map[scheduler.JobName]*time.Duration, targetedSLA *time.Time, skipJobNames map[scheduler.JobName]bool, damperCoeff float64, referenceTime time.Time) (map[scheduler.JobName]*JobState, map[scheduler.JobName][]*JobState) { +func (s *JobSLAPredictorService) IdentifySLABreach(ctx context.Context, jobTarget *scheduler.JobLineageSummary, jobDurations map[scheduler.JobName]*time.Duration, targetedSLA *time.Time, skipJobNames map[scheduler.JobName]bool, damperCoeff float64, referenceTime time.Time) (map[scheduler.JobName]*JobState, map[scheduler.JobName][]*JobState) { // calculate inferred SLAs for each job based on their downstream critical jobs and estimated durations // S(u|j) = S(j) - D(u) jobSLAStatesByJobTarget := s.CalculateInferredSLAs(jobTarget, jobDurations, targetedSLA, damperCoeff) @@ -213,7 +220,7 @@ func (s *JobSLAPredictorService) IdentifySLABreach(jobTarget *scheduler.JobLinea // identify jobs that might breach their SLAs based on current time and inferred SLAs // T(now)>= S(u|j) and the job u has not completed yet // T(now)>= S(u|j) - D(u) and the job u has not started yet - rootCauses, allUpstreamStates := s.identifySLABreachRootCauses(jobTarget, jobSLAStates, skipJobNames, referenceTime) + rootCauses, allUpstreamStates := s.identifySLABreachRootCauses(ctx, jobTarget, jobSLAStates, skipJobNames, referenceTime) // populate breachesCauses breachesCauses := make(map[scheduler.JobName]*JobState) @@ -432,7 +439,7 @@ func (s *JobSLAPredictorService) CalculateInferredSLAs(jobTarget *scheduler.JobL // - Given current time in UTC T(now), T(now)>= S(u|j) (the inferred SLA for u induced by j has passed) and the upstream job u has not completed yet. Or, // - Given current time in UTC T(now), T(now)>= S(u|j) - D(u) (the inferred SLA for u induced by j minus the average duration of u has passed) and the upstream job u has not started yet. // return the job that might breach its SLA -func (s *JobSLAPredictorService) identifySLABreachRootCauses(jobTarget *scheduler.JobLineageSummary, jobSLAStates map[scheduler.JobName]*JobSLAState, skipJobNames map[scheduler.JobName]bool, referenceTime time.Time) ([][]*JobState, [][]*JobState) { +func (s *JobSLAPredictorService) identifySLABreachRootCauses(ctx context.Context, jobTarget *scheduler.JobLineageSummary, jobSLAStates map[scheduler.JobName]*JobSLAState, skipJobNames map[scheduler.JobName]bool, referenceTime time.Time) ([][]*JobState, [][]*JobState) { jobBreachStates := make(map[scheduler.JobName]*JobState) allUpstreamStates := make([][]*JobState, 0) @@ -494,27 +501,35 @@ func (s *JobSLAPredictorService) identifySLABreachRootCauses(jobTarget *schedule var state *JobState // condition 1: T(now)>= S(u|j) and the job u has not completed yet if (referenceTime.After(inferredSLA) && jobRun != nil && jobRun.JobEndTime == nil) || (jobRun != nil && jobRun.JobEndTime != nil && jobRun.JobEndTime.After(inferredSLA)) { - // add to jobStatePaths - state = &JobState{ - JobSLAState: *jobSLAStates[job.JobName], - JobName: job.JobName, - JobRun: *jobRun, - Tenant: job.Tenant, - RelativeLevel: jobWithState.level, - Status: SLABreachCauseRunningLate, + if !s.isJobRunSuccess(ctx, job, jobRun) { + // add to jobStatePaths + state = &JobState{ + JobSLAState: *jobSLAStates[job.JobName], + JobName: job.JobName, + JobRun: *jobRun, + Tenant: job.Tenant, + RelativeLevel: jobWithState.level, + Status: SLABreachCauseRunningLate, + } + } else { + s.l.Info("skipping job for SLA breach as the job run has completed successfully", "job", job.JobName) } } // condition 2: T(now)>= S(u|j) - D(u) and the job u has not started yet if referenceTime.After(inferredSLA.Add(-estimatedDuration)) && (jobRun != nil && jobRun.TaskStartTime == nil) { - // add to jobStatePaths - state = &JobState{ - JobSLAState: *jobSLAStates[job.JobName], - JobName: job.JobName, - JobRun: *jobRun, - Tenant: job.Tenant, - RelativeLevel: jobWithState.level, - Status: SLABreachCauseNotStarted, + if !s.isJobRunSuccess(ctx, job, jobRun) { + // add to jobStatePaths + state = &JobState{ + JobSLAState: *jobSLAStates[job.JobName], + JobName: job.JobName, + JobRun: *jobRun, + Tenant: job.Tenant, + RelativeLevel: jobWithState.level, + Status: SLABreachCauseNotStarted, + } + } else { + s.l.Info("skipping job for SLA breach as the job run has completed successfully", "job", job.JobName) } } @@ -564,6 +579,28 @@ func (s *JobSLAPredictorService) identifySLABreachRootCauses(jobTarget *schedule return rootCauses, compactedAllUpstreamStates } +func (s *JobSLAPredictorService) isJobRunSuccess(ctx context.Context, job *scheduler.JobLineageSummary, jobRun *scheduler.JobRunSummary) bool { + if jobRun == nil { + return false + } + jobCron, err := cron.ParseCronSchedule(job.ScheduleInterval) + if err != nil { + s.l.Error("failed to parse cron schedule for job, skipping SLA breach check for running late condition", "job", job.JobName, "error", err) + return false + } + runStatuses, err := s.airflowStatusGetter.GetJobRuns(ctx, job.Tenant, &scheduler.JobRunsCriteria{ + Name: job.JobName.String(), + StartDate: jobRun.ScheduledAt, + EndDate: jobRun.ScheduledAt, + }, jobCron) + if err != nil || len(runStatuses) == 0 { + s.l.Error("failed to get job run status from Airflow", "job", jobRun.JobName, "scheduled_at", jobRun.ScheduledAt, "error", err) + return false + } + jobRunStatus := runStatuses[0] // there should be only one run status for the given scheduled at + return jobRunStatus.State == scheduler.StateSuccess +} + // populateJobSLAStates populates the jobSLAStatesByJobName map with the estimated durations and inferred SLAs for each job. func (*JobSLAPredictorService) populateJobSLAStates(jobDurations map[scheduler.JobName]*time.Duration, jobSLAsByJobName map[scheduler.JobName]*time.Time) map[scheduler.JobName]*JobSLAState { jobSLAStatesByJobName := make(map[scheduler.JobName]*JobSLAState) diff --git a/core/scheduler/service/job_sla_predictor_service_test.go b/core/scheduler/service/job_sla_predictor_service_test.go index a1fb528b14..04780e7cd6 100644 --- a/core/scheduler/service/job_sla_predictor_service_test.go +++ b/core/scheduler/service/job_sla_predictor_service_test.go @@ -14,6 +14,7 @@ import ( "github.com/goto/optimus/core/scheduler" "github.com/goto/optimus/core/scheduler/service" "github.com/goto/optimus/core/tenant" + "github.com/goto/optimus/internal/lib/cron" ) func TestIdentifySLABreaches(t *testing.T) { @@ -29,7 +30,8 @@ func TestIdentifySLABreaches(t *testing.T) { jobLineageFetcher := NewJobLineageFetcher(t) durationEstimator := NewDurationEstimator(t) jobDetailsGetter := NewJobDetailsGetter(t) - jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil) + airflowStatusGetter := NewAirflowStatusGetter(t) + jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil, airflowStatusGetter) projectName := tenant.ProjectName("project-a") nextScheduledRangeInHours := 10 * time.Hour @@ -56,7 +58,8 @@ func TestIdentifySLABreaches(t *testing.T) { jobLineageFetcher := NewJobLineageFetcher(t) durationEstimator := NewDurationEstimator(t) jobDetailsGetter := NewJobDetailsGetter(t) - jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil) + airflowStatusGetter := NewAirflowStatusGetter(t) + jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil, airflowStatusGetter) projectName := tenant.ProjectName("project-a") nextScheduledRangeInHours := 10 * time.Hour @@ -87,7 +90,8 @@ func TestIdentifySLABreaches(t *testing.T) { jobLineageFetcher := NewJobLineageFetcher(t) durationEstimator := NewDurationEstimator(t) jobDetailsGetter := NewJobDetailsGetter(t) - jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil) + airflowStatusGetter := NewAirflowStatusGetter(t) + jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil, airflowStatusGetter) projectName := tenant.ProjectName("project-a") nextScheduledRangeInHours := 10 * time.Hour @@ -118,7 +122,8 @@ func TestIdentifySLABreaches(t *testing.T) { jobLineageFetcher := NewJobLineageFetcher(t) durationEstimator := NewDurationEstimator(t) jobDetailsGetter := NewJobDetailsGetter(t) - jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil) + airflowStatusGetter := NewAirflowStatusGetter(t) + jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil, airflowStatusGetter) projectName := tenant.ProjectName("project-a") nextScheduledRangeInHours := 10 * time.Hour @@ -149,7 +154,8 @@ func TestIdentifySLABreaches(t *testing.T) { jobLineageFetcher := NewJobLineageFetcher(t) durationEstimator := NewDurationEstimator(t) jobDetailsGetter := NewJobDetailsGetter(t) - jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil) + airflowStatusGetter := NewAirflowStatusGetter(t) + jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil, airflowStatusGetter) projectName := tenant.ProjectName("project-a") nextScheduledRangeInHours := 10 * time.Hour @@ -189,7 +195,8 @@ func TestIdentifySLABreaches(t *testing.T) { jobLineageFetcher := NewJobLineageFetcher(t) durationEstimator := NewDurationEstimator(t) jobDetailsGetter := NewJobDetailsGetter(t) - jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil) + airflowStatusGetter := NewAirflowStatusGetter(t) + jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil, airflowStatusGetter) projectName := tenant.ProjectName("project-a") nextScheduledRangeInHours := 10 * time.Hour @@ -233,7 +240,8 @@ func TestIdentifySLABreaches(t *testing.T) { jobLineageFetcher := NewJobLineageFetcher(t) durationEstimator := NewDurationEstimator(t) jobDetailsGetter := NewJobDetailsGetter(t) - jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil) + airflowStatusGetter := NewAirflowStatusGetter(t) + jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil, airflowStatusGetter) projectName := tenant.ProjectName("project-a") nextScheduledRangeInHours := 10 * time.Hour @@ -287,7 +295,8 @@ func TestIdentifySLABreaches(t *testing.T) { jobLineageFetcher := NewJobLineageFetcher(t) durationEstimator := NewDurationEstimator(t) jobDetailsGetter := NewJobDetailsGetter(t) - jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil) + airflowStatusGetter := NewAirflowStatusGetter(t) + jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil, airflowStatusGetter) projectName := tenant.ProjectName("project-a") nextScheduledRangeInHours := 10 * time.Hour @@ -343,7 +352,8 @@ func TestIdentifySLABreaches(t *testing.T) { jobLineageFetcher := NewJobLineageFetcher(t) durationEstimator := NewDurationEstimator(t) jobDetailsGetter := NewJobDetailsGetter(t) - jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil) + airflowStatusGetter := NewAirflowStatusGetter(t) + jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil, airflowStatusGetter) projectName := tenant.ProjectName("project-a") nextScheduledRangeInHours := 10 * time.Hour @@ -424,7 +434,8 @@ func TestIdentifySLABreaches(t *testing.T) { jobLineageFetcher := NewJobLineageFetcher(t) durationEstimator := NewDurationEstimator(t) jobDetailsGetter := NewJobDetailsGetter(t) - jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil) + airflowStatusGetter := NewAirflowStatusGetter(t) + jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil, airflowStatusGetter) projectName := tenant.ProjectName("project-a") nextScheduledRangeInHours := 10 * time.Hour @@ -544,7 +555,8 @@ func TestIdentifySLABreaches(t *testing.T) { jobLineageFetcher := NewJobLineageFetcher(t) durationEstimator := NewDurationEstimator(t) jobDetailsGetter := NewJobDetailsGetter(t) - jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil) + airflowStatusGetter := NewAirflowStatusGetter(t) + jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil, airflowStatusGetter) projectName := tenant.ProjectName("project-a") nextScheduledRangeInHours := 10 * time.Hour @@ -638,6 +650,26 @@ func TestIdentifySLABreaches(t *testing.T) { "job-B": func() *time.Duration { d := 15 * time.Minute; return &d }(), "job-C": func() *time.Duration { d := 10 * time.Minute; return &d }(), }, nil).Once() + airflowStatusGetter.On("GetJobRuns", ctx, mock.Anything, &scheduler.JobRunsCriteria{ + Name: "job-B", + StartDate: scheduledAt.Add(-15 * time.Minute), + EndDate: scheduledAt.Add(-15 * time.Minute), + }, mock.Anything).Return([]*scheduler.JobRunStatus{ + { + ScheduledAt: scheduledAt.Add(-15 * time.Minute), + State: scheduler.StatePending, + }, + }, nil).Once() + airflowStatusGetter.On("GetJobRuns", ctx, mock.Anything, &scheduler.JobRunsCriteria{ + Name: "job-C", + StartDate: scheduledAt.Add(-25 * time.Minute), + EndDate: scheduledAt.Add(-25 * time.Minute), + }, mock.Anything).Return([]*scheduler.JobRunStatus{ + { + ScheduledAt: scheduledAt.Add(-25 * time.Minute), + State: scheduler.StateRunning, + }, + }, nil).Once() // when jobBreachRootCause, err := jobSLAPredictorService.IdentifySLABreaches(ctx, projectName, jobNames, labels, reqConfig) @@ -666,7 +698,8 @@ func TestIdentifySLABreaches(t *testing.T) { jobLineageFetcher := NewJobLineageFetcher(t) durationEstimator := NewDurationEstimator(t) jobDetailsGetter := NewJobDetailsGetter(t) - jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil) + airflowStatusGetter := NewAirflowStatusGetter(t) + jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil, airflowStatusGetter) projectName := tenant.ProjectName("project-a") nextScheduledRangeInHours := 10 * time.Hour @@ -763,6 +796,12 @@ func TestIdentifySLABreaches(t *testing.T) { "job-B": func() *time.Duration { d := 15 * time.Minute; return &d }(), "job-C": func() *time.Duration { d := 10 * time.Minute; return &d }(), }, nil).Once() + airflowStatusGetter.On("GetJobRuns", ctx, mock.Anything, mock.Anything, mock.Anything).Return([]*scheduler.JobRunStatus{ + { + ScheduledAt: scheduledAt.Add(-15 * time.Minute), + State: scheduler.StatePending, + }, + }, nil).Once() // when jobBreachRootCause, err := jobSLAPredictorService.IdentifySLABreaches(ctx, projectName, jobNames, labels, reqConfig) @@ -793,7 +832,8 @@ func TestIdentifySLABreaches(t *testing.T) { jobLineageFetcher := NewJobLineageFetcher(t) durationEstimator := NewDurationEstimator(t) jobDetailsGetter := NewJobDetailsGetter(t) - jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil) + airflowStatusGetter := NewAirflowStatusGetter(t) + jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil, airflowStatusGetter) projectName := tenant.ProjectName("project-a") nextScheduledRangeInHours := 10 * time.Hour @@ -927,6 +967,26 @@ func TestIdentifySLABreaches(t *testing.T) { "job-B": func() *time.Duration { d := 15 * time.Minute; return &d }(), "job-C": func() *time.Duration { d := 10 * time.Minute; return &d }(), }, nil).Once() + airflowStatusGetter.On("GetJobRuns", ctx, mock.Anything, &scheduler.JobRunsCriteria{ + Name: "job-B", + StartDate: scheduledAt.Add(-15 * time.Minute), + EndDate: scheduledAt.Add(-15 * time.Minute), + }, mock.Anything).Return([]*scheduler.JobRunStatus{ + { + ScheduledAt: scheduledAt.Add(-15 * time.Minute), + State: scheduler.StatePending, + }, + }, nil).Once() + airflowStatusGetter.On("GetJobRuns", ctx, mock.Anything, &scheduler.JobRunsCriteria{ + Name: "job-C", + StartDate: scheduledAt.Add(-25 * time.Minute), + EndDate: scheduledAt.Add(-25 * time.Minute), + }, mock.Anything).Return([]*scheduler.JobRunStatus{ + { + ScheduledAt: scheduledAt.Add(-25 * time.Minute), + State: scheduler.StateRunning, + }, + }, nil).Once() // when jobBreachRootCause, err := jobSLAPredictorService.IdentifySLABreaches(ctx, projectName, jobNames, labels, reqConfig) @@ -957,7 +1017,8 @@ func TestIdentifySLABreaches(t *testing.T) { jobLineageFetcher := NewJobLineageFetcher(t) durationEstimator := NewDurationEstimator(t) jobDetailsGetter := NewJobDetailsGetter(t) - jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil) + airflowStatusGetter := NewAirflowStatusGetter(t) + jobSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, jobLineageFetcher, durationEstimator, jobDetailsGetter, nil, nil, airflowStatusGetter) projectName := tenant.ProjectName("project-a") nextScheduledRangeInHours := 10 * time.Hour @@ -1009,7 +1070,7 @@ func TestIdentifySLABreaches(t *testing.T) { JobRuns: map[scheduler.JobName]*scheduler.JobRunSummary{ jobASchedule.JobName: { ScheduledAt: scheduledAt, - TaskStartTime: &jobATaskStartTime, + TaskStartTime: &jobATaskStartTime, // job-A is running late }, }, Upstreams: []*scheduler.JobLineageSummary{}, @@ -1062,6 +1123,12 @@ func TestIdentifySLABreaches(t *testing.T) { "job-B": func() *time.Duration { d := 15 * time.Minute; return &d }(), "job-C": func() *time.Duration { d := 10 * time.Minute; return &d }(), }, nil).Once() + airflowStatusGetter.On("GetJobRuns", ctx, mock.Anything, mock.Anything, mock.Anything).Return([]*scheduler.JobRunStatus{ + { + ScheduledAt: scheduledAt, + State: scheduler.StateRunning, + }, + }, nil).Once() // when jobBreachRootCause, err := jobSLAPredictorService.IdentifySLABreaches(ctx, projectName, jobNames, labels, reqConfig) @@ -1082,8 +1149,15 @@ func TestIdentifySLABreach(t *testing.T) { DamperCoeff: 1.0, EnablePersistentLogging: false, } + ctx := context.Background() + airflowStatusGetter := NewAirflowStatusGetter(t) + airflowStatusGetter.On("GetJobRuns", ctx, mock.Anything, mock.Anything, mock.Anything).Return([]*scheduler.JobRunStatus{ + { + State: scheduler.StateRunning, + }, + }, nil).Maybe() - slaPredictorService := service.NewJobSLAPredictorService(l, conf, nil, nil, nil, nil, nil, nil) + slaPredictorService := service.NewJobSLAPredictorService(l, conf, nil, nil, nil, nil, nil, nil, airflowStatusGetter) t.Run("given job lineage with no upstream issues", func(t *testing.T) { referenceTime := time.Now().UTC() @@ -1100,7 +1174,7 @@ func TestIdentifySLABreach(t *testing.T) { jobTargetLineage := jobTargetLineageMap["job-A"] - breachCauses, allUpstreamStates := slaPredictorService.IdentifySLABreach(jobTargetLineage, durations, &targetSLA, map[scheduler.JobName]bool{}, 1.0, referenceTime) + breachCauses, allUpstreamStates := slaPredictorService.IdentifySLABreach(ctx, jobTargetLineage, durations, &targetSLA, map[scheduler.JobName]bool{}, 1.0, referenceTime) assert.Len(t, breachCauses, 0) assert.Len(t, allUpstreamStates, 0) @@ -1139,7 +1213,7 @@ func TestIdentifySLABreach(t *testing.T) { skipJobNames := map[scheduler.JobName]bool{} - breachesCauses, fullBreachesCauses := slaPredictorService.IdentifySLABreach(jobTargetLineage, durations, &targetSLA, skipJobNames, 1.0, referenceTime) + breachesCauses, fullBreachesCauses := slaPredictorService.IdentifySLABreach(ctx, jobTargetLineage, durations, &targetSLA, skipJobNames, 1.0, referenceTime) assert.Len(t, breachesCauses, 1) assert.Equal(t, breachesCauses["job-C"].Status, service.SLABreachCauseRunningLate) @@ -1148,6 +1222,53 @@ func TestIdentifySLABreach(t *testing.T) { assert.Len(t, fullBreachesCauses["job-C"], 3) }) + t.Run("given job lineage with 1 upstreams running late, but it marked success manually, return no breach", func(t *testing.T) { + // job-C -> job-B -> job-A + // is the target job + // job-C is running late, so job-A might breach its SLA, but it is marked as success manually + + // | job | estimated duration | + // |-----|--------------------| + // | A | 20 mins | -> targeted SLA 30 mins from now + // | B | 15 mins | inferredSLA: now+10 mins, must start: now-5 mins + // | C | 10 mins | inferredSLA: now-5 mins, must start: now-15 mins + + referenceTime := time.Now().UTC() + targetedSLAOffset := 30 * time.Minute + targetSLA := referenceTime.Add(targetedSLAOffset) + durations := map[scheduler.JobName]*time.Duration{ + "job-A": func() *time.Duration { d := 20 * time.Minute; return &d }(), + "job-B": func() *time.Duration { d := 15 * time.Minute; return &d }(), + "job-C": func() *time.Duration { d := 10 * time.Minute; return &d }(), + } + jobNames := []scheduler.JobName{"job-A", "job-B", "job-C"} + + jobTargetLineageMap := generateLineageWithSLAStates(slaPredictorService, durations, jobNames, referenceTime, 1.0, targetSLA) + + jobTargetLineage := jobTargetLineageMap["job-A"] + + jobTargetLineageMap["job-B"].JobRuns["job-A"].TaskStartTime = nil + jobTargetLineageMap["job-B"].JobRuns["job-A"].TaskEndTime = nil + jobTargetLineageMap["job-B"].JobRuns["job-A"].JobEndTime = nil + jobTargetLineageMap["job-C"].JobRuns["job-A"].TaskEndTime = nil + jobTargetLineageMap["job-C"].JobRuns["job-A"].JobEndTime = nil + + skipJobNames := map[scheduler.JobName]bool{} + + currentAirflowStatusGetter := NewAirflowStatusGetter(t) + currentAirflowStatusGetter.On("GetJobRuns", ctx, mock.Anything, mock.Anything, mock.Anything).Return([]*scheduler.JobRunStatus{ + { + State: scheduler.StateSuccess, + }, + }, nil).Maybe() + + currentSLAPredictorService := service.NewJobSLAPredictorService(l, conf, nil, nil, nil, nil, nil, nil, currentAirflowStatusGetter) + breachesCauses, fullBreachesCauses := currentSLAPredictorService.IdentifySLABreach(ctx, jobTargetLineage, durations, &targetSLA, skipJobNames, 1.0, referenceTime) + + assert.Len(t, breachesCauses, 0) + assert.Len(t, fullBreachesCauses, 0) + }) + t.Run("given long lineage 20 levels deep, with dampering factor 1.0, return breach causes on job-18", func(t *testing.T) { referenceTime := time.Now().UTC() targetedSLAOffset := 15 * 5 * time.Minute @@ -1177,7 +1298,7 @@ func TestIdentifySLABreach(t *testing.T) { skipJobNames := map[scheduler.JobName]bool{} - breachesCauses, fullBreachesCauses := slaPredictorService.IdentifySLABreach(jobTargetLineage, durations, &targetSLA, skipJobNames, damperCoeff, referenceTime) + breachesCauses, fullBreachesCauses := slaPredictorService.IdentifySLABreach(ctx, jobTargetLineage, durations, &targetSLA, skipJobNames, damperCoeff, referenceTime) assert.Len(t, breachesCauses, 1) assert.Equal(t, breachesCauses["job-18"].Status, service.SLABreachCauseRunningLate) @@ -1215,7 +1336,7 @@ func TestIdentifySLABreach(t *testing.T) { skipJobNames := map[scheduler.JobName]bool{} - breachesCauses, fullBreachesCauses := slaPredictorService.IdentifySLABreach(jobTargetLineage, durations, &targetSLA, skipJobNames, damperCoeff, referenceTime) + breachesCauses, fullBreachesCauses := slaPredictorService.IdentifySLABreach(ctx, jobTargetLineage, durations, &targetSLA, skipJobNames, damperCoeff, referenceTime) assert.Len(t, breachesCauses, 0) assert.Len(t, fullBreachesCauses, 0) @@ -1514,3 +1635,53 @@ func NewJobDetailsGetter(t interface { return mock } + +// AirflowStatusGetter is an autogenerated mock type for the AirflowStatusGetter type +type AirflowStatusGetter struct { + mock.Mock +} + +// GetJobRuns provides a mock function with given fields: ctx, tnnt, jobQuery, jobCron +func (_m *AirflowStatusGetter) GetJobRuns(ctx context.Context, tnnt tenant.Tenant, jobQuery *scheduler.JobRunsCriteria, jobCron *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error) { + ret := _m.Called(ctx, tnnt, jobQuery, jobCron) + + if len(ret) == 0 { + panic("no return value specified for GetJobRuns") + } + + var r0 []*scheduler.JobRunStatus + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, tenant.Tenant, *scheduler.JobRunsCriteria, *cron.ScheduleSpec) ([]*scheduler.JobRunStatus, error)); ok { + return rf(ctx, tnnt, jobQuery, jobCron) + } + if rf, ok := ret.Get(0).(func(context.Context, tenant.Tenant, *scheduler.JobRunsCriteria, *cron.ScheduleSpec) []*scheduler.JobRunStatus); ok { + r0 = rf(ctx, tnnt, jobQuery, jobCron) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*scheduler.JobRunStatus) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, tenant.Tenant, *scheduler.JobRunsCriteria, *cron.ScheduleSpec) error); ok { + r1 = rf(ctx, tnnt, jobQuery, jobCron) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewAirflowStatusGetter creates a new instance of AirflowStatusGetter. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewAirflowStatusGetter(t interface { + mock.TestingT + Cleanup(func()) +}, +) *AirflowStatusGetter { + mock := &AirflowStatusGetter{} + mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/server/optimus.go b/server/optimus.go index 0d41b2b1eb..2075789690 100644 --- a/server/optimus.go +++ b/server/optimus.go @@ -444,7 +444,7 @@ func (s *OptimusServer) setupHandlers() error { s.conf.Alerting.PotentialSLABreachConfig.DurationEstimatorConfig.PaddingPercentage, s.conf.Alerting.PotentialSLABreachConfig.DurationEstimatorConfig.MinPaddingMinutes, s.conf.Alerting.PotentialSLABreachConfig.DurationEstimatorConfig.MaxPaddingMinutes) - newJobSLAPredictorService := schedulerService.NewJobSLAPredictorService(s.logger, s.conf.Alerting.PotentialSLABreachConfig, slaRepository, jobLineageService, newDurationEstimatorService, jobProviderRepo, alertsHandler, tenantService) + newJobSLAPredictorService := schedulerService.NewJobSLAPredictorService(s.logger, s.conf.Alerting.PotentialSLABreachConfig, slaRepository, jobLineageService, newDurationEstimatorService, jobProviderRepo, alertsHandler, tenantService, newScheduler) // Resource Bounded Context primaryResourceService := rService.NewResourceService(s.logger, resourceRepository, jJobService, resourceManager, s.eventHandler, jJobService, alertsHandler, tenantService, newEngine, syncer, syncStatusRepository)