Skip to content

Commit 79842e0

Browse files
authored
feat: add result for monitoring on job run (#742)
* feat: update dag template and lib to pull result for monitoring when job ends * feat: add migration to add monitoring column for job run * feat: update job run repository to read from monitoring column * feat: add update monitoring on job run repo * feat: add update monitoring on job run service
1 parent 3a9efe9 commit 79842e0

File tree

10 files changed

+118
-22
lines changed

10 files changed

+118
-22
lines changed

core/scheduler/job_run.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ type JobRun struct {
4141
StartTime time.Time
4242
SLAAlert bool
4343
EndTime time.Time
44+
45+
Monitoring map[string]any
4446
}
4547

4648
type OperatorRun struct {

core/scheduler/service/job_run_service.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type JobRunRepository interface {
2626
Create(ctx context.Context, tenant tenant.Tenant, name scheduler.JobName, scheduledAt time.Time, slaDefinitionInSec int64) error
2727
Update(ctx context.Context, jobRunID uuid.UUID, endTime time.Time, jobRunStatus scheduler.State) error
2828
UpdateSLA(ctx context.Context, slaObjects []*scheduler.SLAObject) error
29+
UpdateMonitoring(ctx context.Context, jobRunID uuid.UUID, monitoring map[string]any) error
2930
}
3031

3132
type OperatorRunRepository interface {
@@ -230,7 +231,19 @@ func (s JobRunService) updateJobRun(ctx context.Context, event scheduler.Event)
230231
if err != nil {
231232
return err
232233
}
233-
return s.repo.Update(ctx, jobRun.ID, event.EventTime, event.Status)
234+
if err := s.repo.Update(ctx, jobRun.ID, event.EventTime, event.Status); err != nil {
235+
return err
236+
}
237+
monitoringValues := s.getMonitoringValues(event)
238+
return s.repo.UpdateMonitoring(ctx, jobRun.ID, monitoringValues)
239+
}
240+
241+
func (JobRunService) getMonitoringValues(event scheduler.Event) map[string]any {
242+
var output map[string]any
243+
if value, ok := event.Values["monitoring"]; ok && value != nil {
244+
output, _ = value.(map[string]any)
245+
}
246+
return output
234247
}
235248

236249
func (s JobRunService) updateJobRunSLA(ctx context.Context, event scheduler.Event) error {

core/scheduler/service/job_run_service_test.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ func TestJobRunService(t *testing.T) {
2727
scheduledAtString := "2022-01-02T15:04:05Z"
2828
scheduledAtTimeStamp, _ := time.Parse(scheduler.ISODateFormat, scheduledAtString)
2929
logger := log.NewNoop()
30+
31+
monitoring := map[string]any{
32+
"slot_millis": float64(5000),
33+
"total_bytes_processed": float64(2500),
34+
}
3035
t.Run("UpdateJobState", func(t *testing.T) {
3136
tnnt, _ := tenant.NewTenant(projName.String(), namespaceName.String())
3237

@@ -147,7 +152,8 @@ func TestJobRunService(t *testing.T) {
147152
OperatorName: "some_dummy_name",
148153
JobScheduledAt: scheduledAtTimeStamp,
149154
Values: map[string]any{
150-
"status": "success",
155+
"status": "success",
156+
"monitoring": monitoring,
151157
},
152158
}
153159

@@ -167,6 +173,7 @@ func TestJobRunService(t *testing.T) {
167173
jobRunRepo.On("Create", ctx, tnnt, jobName, scheduledAtTimeStamp, slaDefinitionInSec).Return(nil)
168174
jobRunRepo.On("GetByScheduledAt", ctx, tnnt, jobName, scheduledAtTimeStamp).Return(jobRun, nil).Once()
169175
jobRunRepo.On("Update", ctx, jobRun.ID, event.EventTime, scheduler.StateSuccess).Return(nil)
176+
jobRunRepo.On("UpdateMonitoring", ctx, jobRun.ID, monitoring).Return(nil)
170177
defer jobRunRepo.AssertExpectations(t)
171178

172179
operatorRunRepo := new(mockOperatorRunRepository)
@@ -193,7 +200,8 @@ func TestJobRunService(t *testing.T) {
193200
JobScheduledAt: scheduledAtTimeStamp,
194201
EventTime: eventTime,
195202
Values: map[string]any{
196-
"status": "success",
203+
"status": "success",
204+
"monitoring": monitoring,
197205
},
198206
}
199207

@@ -207,6 +215,7 @@ func TestJobRunService(t *testing.T) {
207215
jobRunRepo := new(mockJobRunRepository)
208216
jobRunRepo.On("GetByScheduledAt", ctx, tnnt, jobName, scheduledAtTimeStamp).Return(&jobRun, nil)
209217
jobRunRepo.On("Update", ctx, jobRun.ID, endTime, scheduler.StateSuccess).Return(nil)
218+
jobRunRepo.On("UpdateMonitoring", ctx, jobRun.ID, monitoring).Return(nil)
210219
defer jobRunRepo.AssertExpectations(t)
211220

212221
runService := service.NewJobRunService(logger,
@@ -251,7 +260,8 @@ func TestJobRunService(t *testing.T) {
251260
JobScheduledAt: scheduledAtTimeStamp,
252261
EventTime: eventTime,
253262
Values: map[string]any{
254-
"status": "success",
263+
"status": "success",
264+
"monitoring": monitoring,
255265
},
256266
}
257267

@@ -307,6 +317,7 @@ func TestJobRunService(t *testing.T) {
307317
jobRunRepo.On("Create", ctx, tnnt, jobName, scheduledAtTimeStamp, slaDefinitionInSec).Return(nil).Once()
308318
jobRunRepo.On("GetByScheduledAt", ctx, tnnt, jobName, scheduledAtTimeStamp).Return(&jobRun, nil).Once()
309319
jobRunRepo.On("Update", ctx, jobRun.ID, endTime, scheduler.StateSuccess).Return(nil)
320+
jobRunRepo.On("UpdateMonitoring", ctx, jobRun.ID, monitoring).Return(nil)
310321
defer jobRunRepo.AssertExpectations(t)
311322

312323
runService := service.NewJobRunService(logger,
@@ -1214,6 +1225,11 @@ func (m *mockJobRunRepository) UpdateSLA(ctx context.Context, slaObjects []*sche
12141225
return args.Error(0)
12151226
}
12161227

1228+
func (m *mockJobRunRepository) UpdateMonitoring(ctx context.Context, jobRunID uuid.UUID, monitoring map[string]any) error {
1229+
args := m.Called(ctx, jobRunID, monitoring)
1230+
return args.Error(0)
1231+
}
1232+
12171233
type JobRepository struct {
12181234
mock.Mock
12191235
}

ext/scheduler/airflow/__lib.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,10 @@ def job_success_event(context):
380380
"event_type": "TYPE_JOB_SUCCESS",
381381
"status": "success"
382382
}
383+
result_for_monitoring = get_result_for_monitoring_from_xcom(context)
384+
if result_for_monitoring is not None:
385+
meta['monitoring'] = result_for_monitoring
386+
383387
optimus_notify(context, meta)
384388
except Exception as e:
385389
print(e)
@@ -390,6 +394,10 @@ def job_failure_event(context):
390394
"event_type": "TYPE_FAILURE",
391395
"status": "failed"
392396
}
397+
result_for_monitoring = get_result_for_monitoring_from_xcom(context)
398+
if result_for_monitoring is not None:
399+
meta['monitoring'] = result_for_monitoring
400+
393401
optimus_notify(context, meta)
394402
except Exception as e:
395403
print(e)
@@ -498,8 +506,7 @@ def shouldSendSensorStartEvent(ctx):
498506
try:
499507
ti = ctx.get('task_instance')
500508
key = "sensorEvt/{}/{}/{}".format(ti.task_id , ctx.get('next_execution_date').strftime(TIMESTAMP_FORMAT) , ti.try_number)
501-
502-
ti.xcom_pull(key=key)
509+
503510
result = ti.xcom_pull(key=key)
504511
if not result:
505512
print("sending NEW sensor start event for attempt number ", ti.try_number)
@@ -510,6 +517,17 @@ def shouldSendSensorStartEvent(ctx):
510517
except Exception as e:
511518
print(e)
512519

520+
def get_result_for_monitoring_from_xcom(ctx):
521+
try:
522+
ti = ctx.get('task_instance')
523+
return_value = ti.xcom_pull(key='return_value')
524+
except Exception as e:
525+
print(f'error getting result for monitoring: {e}')
526+
527+
if type(return_value) is dict:
528+
if 'monitoring' in return_value:
529+
return return_value['monitoring']
530+
return None
513531

514532
def cleanup_xcom(ctx):
515533
try:

ext/scheduler/airflow/dag/dag.py.tmpl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ init_container = k8s.V1Container(
157157
depends_on_past={{ if .JobDetails.Schedule.DependsOnPast }}True{{- else -}}False{{- end -}},
158158
in_cluster=True,
159159
is_delete_operator_pod=True,
160-
do_xcom_push=False,
160+
do_xcom_push=True,
161161
env_vars=executor_env_vars,
162162
{{- if gt .SLAMissDuration 0 }}
163163
sla=timedelta(seconds={{ .SLAMissDuration }}),
@@ -204,7 +204,7 @@ hook_{{$hookName}} = SuperKubernetesPodOperator(
204204
dag=dag,
205205
in_cluster=True,
206206
is_delete_operator_pod=True,
207-
do_xcom_push=False,
207+
do_xcom_push=True,
208208
env_vars=executor_env_vars,
209209
{{- if $t.IsFailHook }}
210210
trigger_rule="one_failed",

ext/scheduler/airflow/dag/expected_dag.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@
126126
depends_on_past=False,
127127
in_cluster=True,
128128
is_delete_operator_pod=True,
129-
do_xcom_push=False,
129+
do_xcom_push=True,
130130
env_vars=executor_env_vars,
131131
sla=timedelta(seconds=7200),
132132
resources=resources,
@@ -167,7 +167,7 @@
167167
dag=dag,
168168
in_cluster=True,
169169
is_delete_operator_pod=True,
170-
do_xcom_push=False,
170+
do_xcom_push=True,
171171
env_vars=executor_env_vars,
172172
resources=resources,
173173
reattach_on_restart=True,
@@ -205,7 +205,7 @@
205205
dag=dag,
206206
in_cluster=True,
207207
is_delete_operator_pod=True,
208-
do_xcom_push=False,
208+
do_xcom_push=True,
209209
env_vars=executor_env_vars,
210210
resources=resources,
211211
reattach_on_restart=True,
@@ -243,7 +243,7 @@
243243
dag=dag,
244244
in_cluster=True,
245245
is_delete_operator_pod=True,
246-
do_xcom_push=False,
246+
do_xcom_push=True,
247247
env_vars=executor_env_vars,
248248
trigger_rule="one_failed",
249249
resources=resources,
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE job_run
2+
DROP COLUMN IF EXISTS monitoring;
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE job_run
2+
ADD COLUMN IF NOT EXISTS monitoring JSONB;

internal/store/postgres/scheduler/job_run_repository.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package scheduler
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"time"
78

@@ -16,7 +17,7 @@ import (
1617

1718
const (
1819
columnsToStore = `job_name, namespace_name, project_name, scheduled_at, start_time, end_time, status, sla_definition, sla_alert`
19-
jobRunColumns = `id, ` + columnsToStore
20+
jobRunColumns = `id, ` + columnsToStore + `, monitoring`
2021
)
2122

2223
type JobRunRepository struct {
@@ -40,6 +41,8 @@ type jobRun struct {
4041

4142
CreatedAt time.Time
4243
UpdatedAt time.Time
44+
45+
Monitoring json.RawMessage
4346
}
4447

4548
func (j jobRun) toJobRun() (*scheduler.JobRun, error) {
@@ -51,14 +54,21 @@ func (j jobRun) toJobRun() (*scheduler.JobRun, error) {
5154
if err != nil {
5255
return nil, errors.AddErrContext(err, scheduler.EntityJobRun, "invalid job run state in database")
5356
}
57+
var monitoring map[string]any
58+
if j.Monitoring != nil {
59+
if err := json.Unmarshal(j.Monitoring, &monitoring); err != nil {
60+
return nil, errors.AddErrContext(err, scheduler.EntityJobRun, "invalid monitoring values in database")
61+
}
62+
}
5463
return &scheduler.JobRun{
55-
ID: j.ID,
56-
JobName: scheduler.JobName(j.JobName),
57-
Tenant: t,
58-
State: state,
59-
StartTime: j.StartTime,
60-
SLAAlert: j.SLAAlert,
61-
EndTime: j.EndTime,
64+
ID: j.ID,
65+
JobName: scheduler.JobName(j.JobName),
66+
Tenant: t,
67+
State: state,
68+
StartTime: j.StartTime,
69+
SLAAlert: j.SLAAlert,
70+
EndTime: j.EndTime,
71+
Monitoring: monitoring,
6272
}, nil
6373
}
6474

@@ -67,7 +77,7 @@ func (j *JobRunRepository) GetByID(ctx context.Context, id scheduler.JobRunID) (
6777
getJobRunByID := `SELECT ` + jobRunColumns + ` FROM job_run where id = $1`
6878
err := j.db.QueryRow(ctx, getJobRunByID, id.UUID()).
6979
Scan(&jr.ID, &jr.JobName, &jr.NamespaceName, &jr.ProjectName, &jr.ScheduledAt, &jr.StartTime, &jr.EndTime,
70-
&jr.Status, &jr.SLADefinition, &jr.SLAAlert)
80+
&jr.Status, &jr.SLADefinition, &jr.SLAAlert, &jr.Monitoring)
7181
if err != nil {
7282
return nil, err
7383
}
@@ -79,7 +89,7 @@ func (j *JobRunRepository) GetByScheduledAt(ctx context.Context, t tenant.Tenant
7989
getJobRunByID := `SELECT ` + jobRunColumns + `, created_at FROM job_run j where project_name = $1 and namespace_name = $2 and job_name = $3 and scheduled_at = $4 order by created_at desc limit 1`
8090
err := j.db.QueryRow(ctx, getJobRunByID, t.ProjectName(), t.NamespaceName(), jobName, scheduledAt).
8191
Scan(&jr.ID, &jr.JobName, &jr.NamespaceName, &jr.ProjectName, &jr.ScheduledAt, &jr.StartTime, &jr.EndTime,
82-
&jr.Status, &jr.SLADefinition, &jr.SLAAlert, &jr.CreatedAt)
92+
&jr.Status, &jr.SLADefinition, &jr.SLAAlert, &jr.Monitoring, &jr.CreatedAt)
8393

8494
if err != nil {
8595
if errors.Is(err, pgx.ErrNoRows) {
@@ -110,6 +120,16 @@ func (j *JobRunRepository) UpdateSLA(ctx context.Context, slaObjects []*schedule
110120
return errors.WrapIfErr(scheduler.EntityJobRun, "unable to update SLA", err)
111121
}
112122

123+
func (j *JobRunRepository) UpdateMonitoring(ctx context.Context, jobRunID uuid.UUID, monitoringValues map[string]any) error {
124+
monitoringBytes, err := json.Marshal(monitoringValues)
125+
if err != nil {
126+
return errors.Wrap(scheduler.EntityJobRun, "error marshalling monitoring values", err)
127+
}
128+
query := `update job_run set monitoring = $1 where id = $2`
129+
_, err = j.db.Exec(ctx, query, monitoringBytes, jobRunID)
130+
return errors.WrapIfErr(scheduler.EntityJobRun, "cannot update monitoring", err)
131+
}
132+
113133
func (j *JobRunRepository) Create(ctx context.Context, t tenant.Tenant, jobName scheduler.JobName, scheduledAt time.Time, slaDefinitionInSec int64) error {
114134
insertJobRun := `INSERT INTO job_run (` + columnsToStore + `, created_at, updated_at) values ($1, $2, $3, $4, NOW(), TIMESTAMP '3000-01-01 00:00:00', $5, $6, FALSE, NOW(), NOW())`
115135
_, err := j.db.Exec(ctx, insertJobRun, jobName, t.NamespaceName(), t.ProjectName(), scheduledAt, scheduler.StateRunning, slaDefinitionInSec)

internal/store/postgres/scheduler/job_run_repository_test.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,27 @@ func TestPostgresJobRunRepository(t *testing.T) {
9393
assert.True(t, jobRunByID.SLAAlert)
9494
})
9595
})
96+
t.Run("UpdateMonitoring", func(t *testing.T) {
97+
t.Run("updates job run monitoring", func(t *testing.T) {
98+
db := dbSetup()
99+
_ = addJobs(ctx, t, db)
100+
jobRunRepo := postgres.NewJobRunRepository(db)
101+
err := jobRunRepo.Create(ctx, tnnt, jobAName, scheduledAt, slaDefinitionInSec)
102+
assert.NoError(t, err)
103+
jobRun, err := jobRunRepo.GetByScheduledAt(ctx, tnnt, jobAName, scheduledAt)
104+
assert.NoError(t, err)
105+
106+
var monitoring = map[string]any{
107+
"slot_millis": float64(5000),
108+
"total_bytes_processed": float64(2500),
109+
}
110+
111+
err = jobRunRepo.UpdateMonitoring(ctx, jobRun.ID, monitoring)
112+
assert.NoError(t, err)
113+
114+
jobRunByID, err := jobRunRepo.GetByID(ctx, scheduler.JobRunID(jobRun.ID))
115+
assert.NoError(t, err)
116+
assert.EqualValues(t, monitoring, jobRunByID.Monitoring)
117+
})
118+
})
96119
}

0 commit comments

Comments
 (0)