Skip to content

Commit a577ede

Browse files
authored
fix: cross entity sensor should use updated job metadata api (#23)
Signed-off-by: Kush Sharma <[email protected]>
1 parent d11711f commit a577ede

File tree

7 files changed

+81
-25
lines changed

7 files changed

+81
-25
lines changed

api/handler/v1/runtime.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -486,13 +486,7 @@ func (sv *RuntimeServiceServer) JobStatus(ctx context.Context, req *pb.JobStatus
486486
return nil, status.Error(codes.NotFound, fmt.Sprintf("%s: project %s not found", err.Error(), req.GetProjectName()))
487487
}
488488

489-
namespaceRepo := sv.namespaceRepoFactory.New(projSpec)
490-
namespaceSpec, err := namespaceRepo.GetByName(req.GetNamespace())
491-
if err != nil {
492-
return nil, status.Error(codes.NotFound, fmt.Sprintf("%s: namespace %s not found", err.Error(), req.GetNamespace()))
493-
}
494-
495-
_, err = sv.jobSvc.GetByName(req.GetJobName(), namespaceSpec)
489+
_, _, err = sv.jobSvc.GetByNameForProject(req.GetJobName(), projSpec)
496490
if err != nil {
497491
return nil, status.Error(codes.NotFound, fmt.Sprintf("%s: failed to find the job %s for namespace %s", err.Error(),
498492
req.GetJobName(), req.GetNamespace()))
@@ -504,7 +498,7 @@ func (sv *RuntimeServiceServer) JobStatus(ctx context.Context, req *pb.JobStatus
504498
req.GetJobName()))
505499
}
506500

507-
adaptedJobStatus := []*pb.JobStatus{}
501+
var adaptedJobStatus []*pb.JobStatus
508502
for _, jobStatus := range jobStatuses {
509503
ts, err := ptypes.TimestampProto(jobStatus.ScheduledAt)
510504
if err != nil {

ext/scheduler/airflow/airflow.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (a *scheduler) GetJobStatus(ctx context.Context, projSpec models.ProjectSpe
128128
return nil, errors.Wrapf(err, "failed to fetch airflow dag runs from %s", fetchURL)
129129
}
130130
if resp.StatusCode != http.StatusOK {
131-
return nil, errors.Errorf("failed to fetch airflow dag runs from %s", fetchURL)
131+
return nil, errors.Errorf("failed to fetch airflow dag runs from %s: %d", fetchURL, resp.StatusCode)
132132
}
133133
defer resp.Body.Close()
134134

@@ -196,7 +196,7 @@ func (a *scheduler) Clear(ctx context.Context, projSpec models.ProjectSpec, jobN
196196
return errors.Wrapf(err, "failed to clear airflow dag runs from %s", clearDagRunURL)
197197
}
198198
if resp.StatusCode != http.StatusOK {
199-
return errors.Errorf("failed to clear airflow dag runs from %s", clearDagRunURL)
199+
return errors.Errorf("failed to clear airflow dag runs from %s: %d", clearDagRunURL, resp.StatusCode)
200200
}
201201

202202
body, err := ioutil.ReadAll(resp.Body)

ext/scheduler/airflow/resources/__lib.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -443,12 +443,13 @@ def get_task_window(self, scheduled_at: str, window_size: str, window_offset: st
443443
return response.json()
444444

445445
def get_job_metadata(self, execution_date, project, job) -> dict:
446-
url = '{optimus_host}/api/v1/instance'.format(optimus_host=self.host)
446+
url = '{optimus_host}/api/v1/project/{project_name}/job/{job_name}/instance'.format(optimus_host=self.host,
447+
project_name=project,
448+
job_name=job)
447449
request_data = {
448450
"scheduledAt": execution_date,
449-
"projectName": project,
450-
"jobName": job,
451-
"type": "hook",
451+
"instance_type": "TASK",
452+
"instance_name": "none"
452453
}
453454
response = requests.post(url, data=json.dumps(request_data))
454455
self._raise_error_if_request_failed(response)

ext/scheduler/airflow2/airflow.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package airflow2
33
import (
44
"bytes"
55
"context"
6+
"encoding/base64"
67
"encoding/json"
78
"fmt"
89
"io"
@@ -28,7 +29,7 @@ var resBaseDAG []byte
2829

2930
const (
3031
baseLibFileName = "__lib.py"
31-
dagStatusUrl = "api/v1/dags/%s/dagRuns"
32+
dagStatusUrl = "api/v1/dags/%s/dagRuns?limit=99999"
3233
dagRunClearURL = "api/v1/dags/%s/clearTaskInstances"
3334
)
3435

@@ -116,19 +117,24 @@ func (a *scheduler) GetJobStatus(ctx context.Context, projSpec models.ProjectSpe
116117
return nil, errors.Errorf("scheduler host not set for %s", projSpec.Name)
117118
}
118119
schdHost = strings.Trim(schdHost, "/")
120+
authToken, ok := projSpec.Secret.GetByName(models.ProjectSchedulerAuth)
121+
if !ok {
122+
return nil, errors.Errorf("%s secret not configured for project %s", models.ProjectSchedulerAuth, projSpec.Name)
123+
}
119124

120-
fetchUrl := fmt.Sprintf(fmt.Sprintf("%s/%s", schdHost, dagStatusUrl), jobName)
121-
request, err := http.NewRequest(http.MethodGet, fetchUrl, nil)
125+
fetchURL := fmt.Sprintf(fmt.Sprintf("%s/%s", schdHost, dagStatusUrl), jobName)
126+
request, err := http.NewRequest(http.MethodGet, fetchURL, nil)
122127
if err != nil {
123-
return nil, errors.Wrapf(err, "failed to build http request for %s", fetchUrl)
128+
return nil, errors.Wrapf(err, "failed to build http request for %s", fetchURL)
124129
}
130+
request.Header.Set("Authorization", fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(authToken))))
125131

126132
resp, err := a.httpClient.Do(request)
127133
if err != nil {
128-
return nil, errors.Wrapf(err, "failed to fetch airflow dag runs from %s", fetchUrl)
134+
return nil, errors.Wrapf(err, "failed to fetch airflow dag runs from %s", fetchURL)
129135
}
130136
if resp.StatusCode != http.StatusOK {
131-
return nil, errors.Errorf("failed to fetch airflow dag runs from %s", fetchUrl)
137+
return nil, errors.Errorf("failed to fetch airflow dag runs from %s: %d", fetchURL, resp.StatusCode)
132138
}
133139
defer resp.Body.Close()
134140

@@ -184,6 +190,10 @@ func (a *scheduler) Clear(ctx context.Context, projSpec models.ProjectSpec, jobN
184190
if !ok {
185191
return errors.Errorf("scheduler host not set for %s", projSpec.Name)
186192
}
193+
authToken, ok := projSpec.Secret.GetByName(models.ProjectSchedulerAuth)
194+
if !ok {
195+
return errors.Errorf("%s secret not configured for project %s", models.ProjectSchedulerAuth, projSpec.Name)
196+
}
187197

188198
schdHost = strings.Trim(schdHost, "/")
189199
airflowDateFormat := "2006-01-02T15:04:05+00:00"
@@ -199,13 +209,14 @@ func (a *scheduler) Clear(ctx context.Context, projSpec models.ProjectSpec, jobN
199209
return errors.Wrapf(err, "failed to build http request for %s", postURL)
200210
}
201211
request.Header.Set("Content-Type", "application/json")
212+
request.Header.Set("Authorization", fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(authToken))))
202213

203214
resp, err := a.httpClient.Do(request)
204215
if err != nil {
205216
return errors.Wrapf(err, "failed to clear airflow dag runs from %s", postURL)
206217
}
207218
if resp.StatusCode != http.StatusOK {
208-
return errors.Errorf("failed to clear airflow dag runs from %s", postURL)
219+
return errors.Errorf("failed to clear airflow dag runs from %s: %d", postURL, resp.StatusCode)
209220
}
210221
defer resp.Body.Close()
211222

ext/scheduler/airflow2/airflow_test.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,13 @@ func TestAirflow2(t *testing.T) {
140140
Name: "test-proj",
141141
Config: map[string]string{
142142
models.ProjectSchedulerHost: host,
143+
models.ProjectSchedulerAuth: "admin:admin",
144+
},
145+
Secret: []models.ProjectSecretItem{
146+
{
147+
Name: models.ProjectSchedulerAuth,
148+
Value: "admin:admin",
149+
},
143150
},
144151
}, "sample_select")
145152

@@ -164,11 +171,27 @@ func TestAirflow2(t *testing.T) {
164171
Config: map[string]string{
165172
models.ProjectSchedulerHost: host,
166173
},
174+
Secret: []models.ProjectSecretItem{
175+
{
176+
Name: models.ProjectSchedulerAuth,
177+
Value: "admin:admin",
178+
},
179+
},
167180
}, "sample_select")
168181

169182
assert.NotNil(t, err)
170183
assert.Len(t, status, 0)
171184
})
185+
t.Run("should fail if not scheduler secret registered", func(t *testing.T) {
186+
air := airflow2.NewScheduler(nil, nil)
187+
_, err := air.GetJobStatus(ctx, models.ProjectSpec{
188+
Name: "test-proj",
189+
Config: map[string]string{
190+
models.ProjectSchedulerHost: host,
191+
},
192+
}, "sample_select")
193+
assert.NotNil(t, err)
194+
})
172195
})
173196
t.Run("Clear", func(t *testing.T) {
174197
host := "http://airflow.example.io"
@@ -195,6 +218,12 @@ func TestAirflow2(t *testing.T) {
195218
Config: map[string]string{
196219
models.ProjectSchedulerHost: host,
197220
},
221+
Secret: []models.ProjectSecretItem{
222+
{
223+
Name: models.ProjectSchedulerAuth,
224+
Value: "admin:admin",
225+
},
226+
},
198227
}, "sample_select", startDateTime, endDateTime)
199228

200229
assert.Nil(t, err)
@@ -217,9 +246,25 @@ func TestAirflow2(t *testing.T) {
217246
Config: map[string]string{
218247
models.ProjectSchedulerHost: host,
219248
},
249+
Secret: []models.ProjectSecretItem{
250+
{
251+
Name: models.ProjectSchedulerAuth,
252+
Value: "admin:admin",
253+
},
254+
},
220255
}, "sample_select", startDateTime, endDateTime)
221256

222257
assert.NotNil(t, err)
223258
})
259+
t.Run("should fail if not scheduler secret registered", func(t *testing.T) {
260+
air := airflow2.NewScheduler(nil, nil)
261+
err := air.Clear(ctx, models.ProjectSpec{
262+
Name: "test-proj",
263+
Config: map[string]string{
264+
models.ProjectSchedulerHost: host,
265+
},
266+
}, "sample_select", startDateTime, endDateTime)
267+
assert.NotNil(t, err)
268+
})
224269
})
225270
}

ext/scheduler/airflow2/resources/__lib.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ def _xcom_value_has_error(_xcom) -> bool:
326326
)
327327
return failed_alert.execute(context=context)
328328

329+
329330
class OptimusAPIClient:
330331
def __init__(self, optimus_host):
331332
self.host = self._add_connection_adapter_if_absent(optimus_host)
@@ -358,12 +359,13 @@ def get_task_window(self, scheduled_at: str, window_size: str, window_offset: st
358359
return response.json()
359360

360361
def get_job_metadata(self, execution_date, project, job) -> dict:
361-
url = '{optimus_host}/api/v1/instance'.format(optimus_host=self.host)
362+
url = '{optimus_host}/api/v1/project/{project_name}/job/{job_name}/instance'.format(optimus_host=self.host,
363+
project_name=project,
364+
job_name=job)
362365
request_data = {
363366
"scheduledAt": execution_date,
364-
"projectName": project,
365-
"jobName": job,
366-
"type": "hook",
367+
"instance_type": "TASK",
368+
"instance_name": "none"
367369
}
368370
response = requests.post(url, data=json.dumps(request_data))
369371
self._raise_error_if_request_failed(response)

models/project.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ const (
1818
// Secret used for uploading prepared scheduler specifications to cloud
1919
// e.g. for gcs it will be base64 encoded service account for the bucket
2020
ProjectSecretStorageKey = "STORAGE"
21+
22+
// Secret used to authenticate with scheduler provided at ProjectSchedulerHost
23+
ProjectSchedulerAuth = "SCHEDULER_AUTH"
2124
)
2225

2326
var (

0 commit comments

Comments
 (0)