Skip to content

Commit 6a1ec5a

Browse files
fix: provide a mechanism to contact optimus server and upstream optimus server (#624)
1 parent 71ef005 commit 6a1ec5a

File tree

3 files changed

+11
-4
lines changed

3 files changed

+11
-4
lines changed

ext/scheduler/airflow2/resources/__lib.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ class SuperExternalTaskSensor(BaseSensorOperator):
240240
def __init__(
241241
self,
242242
optimus_hostname: str,
243+
upstream_optimus_hostname: str,
243244
upstream_optimus_project: str,
244245
upstream_optimus_namespace: str,
245246
upstream_optimus_job: str,
@@ -255,6 +256,7 @@ def __init__(
255256
self.window_size = window_size
256257
self.window_version = window_version
257258
self._optimus_client = OptimusAPIClient(optimus_hostname)
259+
self._upstream_optimus_client = OptimusAPIClient(upstream_optimus_hostname)
258260

259261
def poke(self, context):
260262
log_start_event(context, EVENT_NAMES.get("SENSOR_START_EVENT"))
@@ -297,15 +299,15 @@ def get_last_upstream_times(self, schedule_time_of_current_job, upstream_schedul
297299

298300
def get_schedule_interval(self, schedule_time):
299301
schedule_time_str = schedule_time.strftime(TIMESTAMP_FORMAT)
300-
job_metadata = self._optimus_client.get_job_metadata(schedule_time_str, self.optimus_namespace, self.optimus_project, self.optimus_job)
302+
job_metadata = self._upstream_optimus_client.get_job_metadata(schedule_time_str, self.optimus_namespace, self.optimus_project, self.optimus_job)
301303
upstream_schedule = lookup_non_standard_cron_expression(job_metadata['spec']['interval'])
302304
return upstream_schedule
303305

304306
# TODO the api will be updated with getJobRuns even though the field here refers to scheduledAt
305307
# it points to execution_date
306308
def _are_all_job_runs_successful(self, schedule_time_window_start, schedule_time_window_end) -> bool:
307309
try:
308-
api_response = self._optimus_client.get_job_run(self.optimus_project, self.optimus_job, schedule_time_window_start, schedule_time_window_end)
310+
api_response = self._upstream_optimus_client.get_job_run(self.optimus_project, self.optimus_job, schedule_time_window_start, schedule_time_window_end)
309311
self.log.info("job_run api response :: {}".format(api_response))
310312
except Exception as e:
311313
self.log.warning("error while fetching job runs :: {}".format(e))

ext/scheduler/airflow2/resources/base_dag.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@
216216
{{- $dependencySchema := $dependency.Job.Task.Unit.Info }}
217217
wait_{{$dependency.Job.Name | replace "-" "__dash__" | replace "." "__dot__"}} = SuperExternalTaskSensor(
218218
optimus_hostname="{{$.Hostname}}",
219+
upstream_optimus_hostname="{{$.Hostname}}",
219220
upstream_optimus_project="{{$dependency.Project.Name}}",
220221
upstream_optimus_namespace="{{$dependency.Job.NamespaceSpec.Name}}",
221222
upstream_optimus_job="{{$dependency.Job.Name}}",
@@ -231,7 +232,8 @@
231232
{{- range $_, $dependency := $.Job.ExternalDependencies.OptimusDependencies}}
232233
{{ $identity := print $dependency.Name "-" $dependency.ProjectName "-" $dependency.JobName }}
233234
wait_{{ $identity | replace "-" "__dash__" | replace "." "__dot__"}} = SuperExternalTaskSensor(
234-
optimus_hostname="{{$dependency.Host}}",
235+
optimus_hostname="{{$.Hostname}}",
236+
upstream_optimus_hostname="{{$dependency.Host}}",
235237
upstream_optimus_project="{{$dependency.ProjectName}}",
236238
upstream_optimus_namespace="{{$dependency.NamespaceName}}",
237239
upstream_optimus_job="{{$dependency.JobName}}",

ext/scheduler/airflow2/resources/expected_compiled_template.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@
215215

216216
wait_foo__dash__intra__dash__dep__dash__job = SuperExternalTaskSensor(
217217
optimus_hostname="http://airflow.example.io",
218+
upstream_optimus_hostname="http://airflow.example.io",
218219
upstream_optimus_project="foo-project",
219220
upstream_optimus_namespace="bar-namespace",
220221
upstream_optimus_job="foo-intra-dep-job",
@@ -227,6 +228,7 @@
227228
)
228229
wait_foo__dash__inter__dash__dep__dash__job = SuperExternalTaskSensor(
229230
optimus_hostname="http://airflow.example.io",
231+
upstream_optimus_hostname="http://airflow.example.io",
230232
upstream_optimus_project="foo-external-project",
231233
upstream_optimus_namespace="bar-namespace",
232234
upstream_optimus_job="foo-inter-dep-job",
@@ -239,7 +241,8 @@
239241
)
240242

241243
wait_external__dash__optimus__dash__foo__dash__external__dash__optimus__dash__project__dash__foo__dash__external__dash__optimus__dash__dep__dash__job = SuperExternalTaskSensor(
242-
optimus_hostname="http://optimus.external.io",
244+
optimus_hostname="http://airflow.example.io",
245+
upstream_optimus_hostname="http://optimus.external.io",
243246
upstream_optimus_project="foo-external-optimus-project",
244247
upstream_optimus_namespace="bar-external-optimus-namespace",
245248
upstream_optimus_job="foo-external-optimus-dep-job",

0 commit comments

Comments
 (0)