Skip to content

Commit eb9cf80

Browse files
fix: fix sensor to pass version information to compute the start & end times (#614) (#617)
1 parent 40409a4 commit eb9cf80

File tree

11 files changed

+40
-40
lines changed

11 files changed

+40
-40
lines changed

api/handler/v1beta1/job_run.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,10 +282,6 @@ func (*JobRunServiceServer) GetWindow(_ context.Context, req *pb.GetWindowReques
282282
return nil, status.Errorf(codes.Internal, "%s: failed to parse schedule time %s", err.Error(), req.GetScheduledAt())
283283
}
284284

285-
if req.GetSize() == "" || req.GetOffset() == "" || req.GetTruncateTo() == "" {
286-
return nil, status.Error(codes.InvalidArgument, "window size, offset and truncate_to must be provided")
287-
}
288-
289285
window, err := models.NewWindow(int(req.Version), req.GetTruncateTo(), req.GetOffset(), req.GetSize())
290286
if err != nil {
291287
return nil, err

api/handler/v1beta1/job_run_test.go

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -585,28 +585,6 @@ func TestJobRunServiceServer(t *testing.T) {
585585
assert.Equal(t, "2020-11-11T00:00:00Z", resp.GetStart().AsTime().Format(time.RFC3339))
586586
assert.Equal(t, "2020-11-12T00:00:00Z", resp.GetEnd().AsTime().Format(time.RFC3339))
587587
})
588-
589-
t.Run("should return error if any of the required fields in request is missing", func(t *testing.T) {
590-
JobRunServiceServer := v1.NewJobRunServiceServer(
591-
log,
592-
nil, nil,
593-
nil, nil, nil,
594-
nil,
595-
nil,
596-
monitoringService,
597-
nil,
598-
)
599-
scheduledAt := time.Date(2020, 11, 11, 0, 0, 0, 0, time.UTC)
600-
scheduledAtTimestamp := timestamppb.New(scheduledAt)
601-
req := pb.GetWindowRequest{
602-
ScheduledAt: scheduledAtTimestamp,
603-
Size: "",
604-
Offset: "24h",
605-
TruncateTo: "d",
606-
}
607-
_, err := JobRunServiceServer.GetWindow(ctx, &req)
608-
assert.Equal(t, "rpc error: code = InvalidArgument desc = window size, offset and truncate_to must be provided", err.Error())
609-
})
610588
})
611589
t.Run("JobRun", func(t *testing.T) {
612590
date, err := time.Parse(AirflowDateFormat, "2022-03-25T02:00:00+00:00")

cmd/internal/survey/job_create.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ func (*JobCreateSurvey) getWindowParameters(winName string) local.JobTaskWindow
268268
return local.JobTaskWindow{
269269
Size: "24h",
270270
Offset: "0",
271-
TruncateTo: "h",
271+
TruncateTo: "d",
272272
}
273273
case "weekly":
274274
return local.JobTaskWindow{

ext/scheduler/airflow2/resources/__lib.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,12 @@ def get_job_run(self, optimus_project: str, optimus_job: str, startDate: str, en
151151
self._raise_error_if_request_failed(response)
152152
return response.json()
153153

154-
def get_task_window(self, scheduled_at: str, window_size: str, window_offset: str,
154+
def get_task_window(self, scheduled_at: str, version: int, window_size: str, window_offset: str,
155155
window_truncate_upto: str) -> dict:
156-
url = '{optimus_host}/api/v1beta1/window?scheduledAt={scheduled_at}&size={window_size}&offset={window_offset}&truncate_to={window_truncate_upto}'.format(
156+
url = '{optimus_host}/api/v1beta1/window?scheduledAt={scheduled_at}&version={window_version}&size={window_size}&offset={window_offset}&truncate_to={window_truncate_upto}'.format(
157157
optimus_host=self.host,
158158
scheduled_at=scheduled_at,
159+
window_version=version,
159160
window_size=window_size,
160161
window_offset=window_offset,
161162
window_truncate_upto=window_truncate_upto,
@@ -195,7 +196,8 @@ def _raise_error_if_request_failed(self, response):
195196

196197

197198
class JobSpecTaskWindow:
198-
def __init__(self, size: str, offset: str, truncate_to: str, optimus_client: OptimusAPIClient):
199+
def __init__(self, version: int, size: str, offset: str, truncate_to: str, optimus_client: OptimusAPIClient):
200+
self.version = version
199201
self.size = size
200202
self.offset = offset
201203
self.truncate_to = truncate_to
@@ -229,7 +231,7 @@ def _parse_datetime_utc_str(self, timestamp):
229231
return timestamp.strftime(TIMESTAMP_FORMAT)
230232

231233
def _fetch_task_window(self, scheduled_at: str) -> dict:
232-
return self._optimus_client.get_task_window(scheduled_at, self.size, self.offset, self.truncate_to)
234+
return self._optimus_client.get_task_window(scheduled_at, self.version, self.size, self.offset, self.truncate_to)
233235

234236

235237
class SuperExternalTaskSensor(BaseSensorOperator):
@@ -242,14 +244,16 @@ def __init__(
242244
upstream_optimus_namespace: str,
243245
upstream_optimus_job: str,
244246
window_size: str,
247+
window_version: int,
245248
*args,
246249
**kwargs) -> None:
247250
kwargs['mode'] = kwargs.get('mode', 'reschedule')
248251
super().__init__(**kwargs)
249252
self.optimus_project = upstream_optimus_project
250-
self.optimus_namespace =upstream_optimus_namespace
253+
self.optimus_namespace = upstream_optimus_namespace
251254
self.optimus_job = upstream_optimus_job
252255
self.window_size = window_size
256+
self.window_version = window_version
253257
self._optimus_client = OptimusAPIClient(optimus_hostname)
254258

255259
def poke(self, context):
@@ -267,7 +271,7 @@ def poke(self, context):
267271
schedule_time, upstream_schedule)
268272

269273
# get schedule window
270-
task_window = JobSpecTaskWindow(self.window_size, 0, "m", self._optimus_client)
274+
task_window = JobSpecTaskWindow(self.window_version, self.window_size, 0, "", self._optimus_client)
271275
schedule_time_window_start, schedule_time_window_end = task_window.get_schedule_window(
272276
last_upstream_schedule_time.strftime(TIMESTAMP_FORMAT),upstream_schedule)
273277

@@ -362,7 +366,7 @@ def _xcom_value_has_error(_xcom) -> bool:
362366

363367
"job_run_id" : context.get('dag_run').run_id,
364368
"task_run_id" : context.get('run_id'),
365-
369+
366370
"attempt" :context['task_instance'].try_number,
367371
"event_time" :datetime.now().timestamp(),
368372
}
@@ -445,7 +449,7 @@ def log_failure_event(context):
445449
if not should_relay_airflow_callbacks(context):
446450
return
447451
run_type = get_run_type(context)
448-
452+
449453
meta = {
450454
"event_type": "TYPE_{}_FAIL".format(run_type)
451455
}

ext/scheduler/airflow2/resources/base_dag.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@
220220
upstream_optimus_namespace="{{$dependency.Job.NamespaceSpec.Name}}",
221221
upstream_optimus_job="{{$dependency.Job.Name}}",
222222
window_size="{{ $baseWindow.GetSize }}",
223+
window_version=int("{{ $baseWindow.GetVersion }}"),
223224
poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS,
224225
timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS,
225226
task_id="wait_{{$dependency.Job.Name | trunc 200}}-{{$dependencySchema.Name}}",
@@ -235,6 +236,7 @@
235236
upstream_optimus_namespace="{{$dependency.NamespaceName}}",
236237
upstream_optimus_job="{{$dependency.JobName}}",
237238
window_size="{{ $baseWindow.GetSize }}",
239+
window_version=int("{{ $baseWindow.GetVersion }}"),
238240
poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS,
239241
timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS,
240242
task_id="wait_{{$dependency.JobName | trunc 200}}-{{$dependency.TaskName}}",

ext/scheduler/airflow2/resources/expected_compiled_template.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@
219219
upstream_optimus_namespace="bar-namespace",
220220
upstream_optimus_job="foo-intra-dep-job",
221221
window_size="1h",
222+
window_version=int("1"),
222223
poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS,
223224
timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS,
224225
task_id="wait_foo-intra-dep-job-bq",
@@ -230,6 +231,7 @@
230231
upstream_optimus_namespace="bar-namespace",
231232
upstream_optimus_job="foo-inter-dep-job",
232233
window_size="1h",
234+
window_version=int("1"),
233235
poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS,
234236
timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS,
235237
task_id="wait_foo-inter-dep-job-bq",
@@ -242,6 +244,7 @@
242244
upstream_optimus_namespace="bar-external-optimus-namespace",
243245
upstream_optimus_job="foo-external-optimus-dep-job",
244246
window_size="1h",
247+
window_version=int("1"),
245248
poke_interval=SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS,
246249
timeout=SENSOR_DEFAULT_TIMEOUT_IN_SECS,
247250
task_id="wait_foo-external-optimus-dep-job-bq",

models/window.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type Window interface {
1515
GetTruncateTo() string
1616
GetOffset() string
1717
GetSize() string
18+
GetVersion() int
1819
}
1920

2021
func NewWindow(version int, truncateTo, offset, size string) (Window, error) {

models/window_v1.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ func (w windowV1) Validate() error {
2727
return err
2828
}
2929

30+
func (windowV1) GetVersion() int {
31+
return 1
32+
}
33+
3034
func (w windowV1) GetTruncateTo() string {
3135
return w.truncateTo
3236
}
@@ -74,7 +78,7 @@ func (w *windowV1) prepareWindow() (JobSpecTaskWindow, error) {
7478
window := JobSpecTaskWindow{}
7579
window.Size = time.Hour * HoursInDay
7680
window.Offset = 0
77-
window.TruncateTo = "d"
81+
window.TruncateTo = ""
7882

7983
if w.truncateTo != "" {
8084
window.TruncateTo = w.truncateTo
@@ -128,7 +132,7 @@ func (*JobSpecTaskWindow) getWindowDate(today time.Time, windowSize, windowOffse
128132
windowStart := windowEnd.Add(-windowSize)
129133

130134
// handle monthly windows separately as every month is not of same size
131-
if windowTruncateTo == "M" || windowTruncateTo == "m" {
135+
if windowTruncateTo == "M" {
132136
floatingEnd = today
133137
// shift current window to nearest month start and end
134138

models/window_v1_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestWindowV1(t *testing.T) {
3535
}
3636
})
3737
t.Run("should not return error for valid window truncate configs", func(t *testing.T) {
38-
validTruncateConfigs := []string{"h", "d", "w", "m", "M", ""}
38+
validTruncateConfigs := []string{"h", "d", "w", "M", ""}
3939
for _, config := range validTruncateConfigs {
4040
window, err := models.NewWindow(1, config, "", "")
4141
if err != nil {
@@ -69,8 +69,8 @@ func TestWindowV1(t *testing.T) {
6969
Size: "24h",
7070
Offset: "0",
7171
TruncateTo: "",
72-
ExpectedStartTime: time.Date(2020, 7, 9, 0, 0, 0, 0, time.UTC), // modified from the original, since it was not consistent with the implementation default truncate. original [time.Date(2020, 7, 9, 6, 33, 22, 0, time.UTC)]
73-
ExpectedEndTime: time.Date(2020, 7, 10, 0, 0, 0, 0, time.UTC), // modified from the original, since it was not consistent with the implementation default truncate. original [time.Date(2020, 7, 10, 6, 33, 22, 0, time.UTC)]
72+
ExpectedStartTime: time.Date(2020, 7, 9, 6, 33, 22, 0, time.UTC), // modified from the original, since it was not consistent with the implementation default truncate. original [time.Date(2020, 7, 9, 6, 33, 22, 0, time.UTC)]
73+
ExpectedEndTime: time.Date(2020, 7, 10, 6, 33, 22, 0, time.UTC), // modified from the original, since it was not consistent with the implementation default truncate. original [time.Date(2020, 7, 10, 6, 33, 22, 0, time.UTC)]
7474
},
7575
{
7676
ScheduleTime: time.Date(2020, 7, 10, 6, 33, 22, 0, time.UTC),

models/window_v2.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ type windowV2 struct {
1515
size string
1616
}
1717

18+
func (windowV2) GetVersion() int {
19+
return 2 //nolint:gomnd
20+
}
21+
1822
func (w windowV2) Validate() error {
1923
if err := w.validateTruncateTo(); err != nil {
2024
return fmt.Errorf("error validating truncate_to: %w", err)

0 commit comments

Comments
 (0)