Skip to content

Commit 2ce5ba4

Browse files
fix: sensor to use scheduled_time & all the window params (#211)
* chore: cleanup pack-files step as it is not needed * fix: sensor to use scheduled_time & all the window params
1 parent a2a0f91 commit 2ce5ba4

File tree

5 files changed

+25
-24
lines changed

5 files changed

+25
-24
lines changed

.github/workflows/lint.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ jobs:
1717
- uses: actions/checkout@v2
1818
with:
1919
fetch-depth: 0
20-
- name: generate dependencies
21-
run: make pack-files
2220
- name: golangci-lint
2321
uses: golangci/golangci-lint-action@v2
2422
with:

Makefile

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,18 @@ LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
77
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
88
PROTON_COMMIT := "2a30976f7b40884ddd90e1792576c0941426e8bc"
99

10-
.PHONY: build test test-ci pack-files generate-proto unit-test-ci smoke-test integration-test vet coverage clean install lint
10+
.PHONY: build test test-ci generate-proto unit-test-ci smoke-test integration-test vet coverage clean install lint
1111

1212
.DEFAULT_GOAL := build
1313

14-
build: pack-files # build optimus binary
14+
build: # build optimus binary
1515
@echo " > notice: skipped proto generation, use 'generate-proto' make command"
1616
@echo " > building optimus version ${OPMS_VERSION}"
1717
@go build -ldflags "-X ${NAME}/config.Version=${OPMS_VERSION} -X ${NAME}/config.BuildCommit=${LAST_COMMIT}" -o optimus .
1818
@echo " - build complete"
1919

2020
test-ci: smoke-test unit-test-ci vet ## run tests
2121

22-
pack-files:
23-
@echo " > packing resources"
24-
@go generate ./..
25-
2622
generate-proto: ## regenerate protos
2723
@echo " > generating protobuf from odpf/proton"
2824
@echo " > [info] make sure correct version of dependencies are installed using 'make install'"

ext/scheduler/airflow2/resources/__lib.py

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,8 @@ def poke(self, context, session=None):
174174
'{} was deleted.'.format(self.upstream_dag))
175175

176176
# calculate windows
177-
execution_date = context['execution_date']
178-
window_start, window_end = self.generate_window(execution_date, self.window_size)
177+
schedule_time = context['next_execution_date']
178+
window_start, window_end = self.generate_window(schedule_time)
179179
self.log.info(
180180
"consuming upstream window between: {} - {}".format(window_start.isoformat(), window_end.isoformat()))
181181
self.log.info("upstream interval: {}".format(dag_to_wait.schedule_interval))
@@ -208,12 +208,12 @@ def poke(self, context, session=None):
208208

209209
return True
210210

211-
def generate_window(self, execution_date, window_size):
211+
def generate_window(self, schedule_time):
212212
format_rfc3339 = "%Y-%m-%dT%H:%M:%SZ"
213-
execution_date_str = execution_date.strftime(format_rfc3339)
213+
schedule_time_str = schedule_time.strftime(format_rfc3339)
214214
# ignore offset & truncateto
215-
task_window = JobSpecTaskWindow(window_size, 0, "m", self._optimus_client)
216-
return task_window.get(execution_date_str)
215+
task_window = JobSpecTaskWindow(self.window_size, self.window_offset, self.window_truncate_to, self._optimus_client)
216+
return task_window.get(schedule_time_str)
217217

218218
def _get_expected_upstream_executions(self, schedule_interval, window_start, window_end):
219219
expected_upstream_executions = []
@@ -325,24 +325,28 @@ def __init__(
325325
upstream_optimus_project: str,
326326
upstream_optimus_job: str,
327327
window_size: str,
328+
window_offset: str,
329+
window_truncate_to: str,
328330
*args,
329331
**kwargs) -> None:
330332
kwargs['mode'] = kwargs.get('mode', 'reschedule')
331333
super().__init__(**kwargs)
332334
self.optimus_project = upstream_optimus_project
333335
self.optimus_job = upstream_optimus_job
334336
self.window_size = window_size
337+
self.window_offset = window_offset
338+
self.window_truncate_to = window_truncate_to
335339
self._optimus_client = OptimusAPIClient(optimus_hostname)
336340

337341
def poke(self, context):
338-
execution_date = context['execution_date']
339-
execution_date_str = execution_date.strftime(self.TIMESTAMP_FORMAT)
342+
schedule_time = context['next_execution_date']
343+
schedule_time_str = schedule_time.strftime(self.TIMESTAMP_FORMAT)
340344

341345
# parse relevant metadata from the job metadata to build the task window
342-
job_metadata = self._optimus_client.get_job_metadata(execution_date_str, self.optimus_project, self.optimus_job)
346+
job_metadata = self._optimus_client.get_job_metadata(schedule_time_str, self.optimus_project, self.optimus_job)
343347
cron_schedule = lookup_non_standard_cron_expression(job_metadata['job']['interval'])
344348

345-
window_start, window_end = self.generate_window(execution_date, self.window_size)
349+
window_start, window_end = self.generate_window(schedule_time)
346350
self.log.info(
347351
"consuming upstream window between: {} - {}".format(window_start.isoformat(), window_end.isoformat()))
348352
expected_upstream_executions = self._get_expected_upstream_executions(cron_schedule, window_start, window_end)
@@ -388,12 +392,11 @@ def _parse_datetime(self, timestamp) -> datetime:
388392
except ValueError:
389393
return datetime.strptime(timestamp, self.TIMESTAMP_MS_FORMAT)
390394

391-
def generate_window(self, execution_date, window_size):
395+
def generate_window(self, schedule_time):
392396
format_rfc3339 = "%Y-%m-%dT%H:%M:%SZ"
393-
execution_date_str = execution_date.strftime(format_rfc3339)
394-
# ignore offset & truncate to
395-
task_window = JobSpecTaskWindow(window_size, 0, "m", self._optimus_client)
396-
return task_window.get(execution_date_str)
397+
schedule_time_str = schedule_time.strftime(format_rfc3339)
398+
task_window = JobSpecTaskWindow(self.window_size, self.window_offset, self.window_truncate_to, self._optimus_client)
399+
return task_window.get(schedule_time_str)
397400

398401

399402
def optimus_failure_notify(context):

ext/scheduler/airflow2/resources/base_dag.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,9 @@
195195
optimus_hostname="{{$.Hostname}}",
196196
upstream_optimus_project="{{$dependency.Project.Name}}",
197197
upstream_optimus_job="{{$dependency.Job.Name}}",
198-
window_size="{{ $baseWindow.Size.String }}",
198+
window_size={{$baseWindow.Size.String | quote}},
199+
window_offset={{$baseWindow.Offset.String | quote}},
200+
window_truncate_to={{$baseWindow.TruncateTo | quote}},
199201
poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS,
200202
timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS,
201203
task_id="wait_{{$dependency.Job.Name | trunc 200}}-{{$dependencySchema.Name}}",

ext/scheduler/airflow2/resources/expected_compiled_template.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@
198198
upstream_optimus_project="foo-external-project",
199199
upstream_optimus_job="foo-inter-dep-job",
200200
window_size="1h0m0s",
201+
window_offset="0s",
202+
window_truncate_to="d",
201203
poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS,
202204
timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS,
203205
task_id="wait_foo-inter-dep-job-bq",

0 commit comments

Comments
 (0)