@@ -14,6 +14,7 @@ import (
14
14
)
15
15
16
16
const (
17
+ queryDateId int = - 1
17
18
codeShareChildId int = 10
18
19
codeShareParentId int = 50
19
20
)
@@ -30,43 +31,44 @@ type ConvertFlightSchedulesOutput struct {
30
31
}
31
32
32
33
type cfsAction struct {
33
- s3c * s3. Client
34
+ s3c MinimalS3Client
34
35
}
35
36
36
- func NewConvertFlightSchedulesAction (s3c * s3. Client ) Action [ConvertFlightSchedulesParams , ConvertFlightSchedulesOutput ] {
37
+ func NewConvertFlightSchedulesAction (s3c MinimalS3Client ) Action [ConvertFlightSchedulesParams , ConvertFlightSchedulesOutput ] {
37
38
return & cfsAction {s3c }
38
39
}
39
40
40
41
func (a * cfsAction ) Handle (ctx context.Context , params ConvertFlightSchedulesParams ) (ConvertFlightSchedulesOutput , error ) {
41
42
ctx , cancel := context .WithCancel (ctx )
42
43
defer cancel ()
43
44
44
- var grouped map [common.LocalDate ][]* common.Flight
45
- {
46
- g , ctx := errgroup .WithContext (ctx )
47
- results := make ([][]* common.Flight , len (params .DateRanges ))
45
+ ch := make (chan * common.Flight , 1024 )
46
+ g , ctx := errgroup .WithContext (ctx )
48
47
49
- for i , r := range params .DateRanges {
50
- g .Go (func () error {
51
- flights , err := a .convertRange (ctx , params .InputBucket , params .InputPrefix , r [0 ], r [1 ])
52
- if err != nil {
53
- return err
54
- }
48
+ for _ , r := range params .DateRanges {
49
+ g .Go (func () error {
50
+ return a .convertRange (ctx , params .InputBucket , params .InputPrefix , r [0 ], r [1 ], ch )
51
+ })
52
+ }
55
53
56
- results [i ] = flights
57
- return nil
58
- })
59
- }
54
+ done := make (chan map [common.LocalDate ][]* common.Flight )
55
+ go func () {
56
+ defer close (done )
60
57
61
- if err := g .Wait (); err != nil {
62
- return ConvertFlightSchedulesOutput {}, err
58
+ result := make (map [common.LocalDate ][]* common.Flight )
59
+ for f := range ch {
60
+ result [f .DepartureDate ()] = append (result [f .DepartureDate ()], f )
63
61
}
64
62
65
- grouped = groupByDepartureDateUTC (results )
63
+ done <- result
64
+ }()
65
+
66
+ if err := func () error { defer close (ch ); return g .Wait () }(); err != nil {
67
+ return ConvertFlightSchedulesOutput {}, err
66
68
}
67
69
68
- g , ctx : = errgroup .WithContext (ctx )
69
- for d , flights := range grouped {
70
+ g , ctx = errgroup .WithContext (ctx )
71
+ for d , flights := range <- done {
70
72
g .Go (func () error {
71
73
return a .upsertFlights (
72
74
ctx ,
@@ -82,36 +84,24 @@ func (a *cfsAction) Handle(ctx context.Context, params ConvertFlightSchedulesPar
82
84
return ConvertFlightSchedulesOutput {}, g .Wait ()
83
85
}
84
86
85
- func (a * cfsAction ) convertRange (ctx context.Context , inputBucket , inputPrefix string , start , end common.LocalDate ) ([]* common.Flight , error ) {
86
- var flights []* common.Flight
87
-
87
+ func (a * cfsAction ) convertRange (ctx context.Context , inputBucket , inputPrefix string , start , end common.LocalDate , ch chan <- * common.Flight ) error {
88
+ g , ctx := errgroup .WithContext (ctx )
88
89
for curr := range start .Until (end ) {
89
- converted , err := a .convertSingle (ctx , inputBucket , inputPrefix , curr )
90
- if err != nil {
91
- return nil , err
92
- }
93
-
94
- flights = append (flights , converted ... )
90
+ g .Go (func () error {
91
+ return a .convertSingle (ctx , inputBucket , inputPrefix , curr , ch )
92
+ })
95
93
}
96
94
97
- return flights , nil
95
+ return g . Wait ()
98
96
}
99
97
100
- func (a * cfsAction ) convertSingle (ctx context.Context , inputBucket , inputPrefix string , d common.LocalDate ) ([]* common.Flight , error ) {
101
- var flights []* common.Flight
102
- {
103
- schedules , err := a .loadFlightSchedules (ctx , inputBucket , inputPrefix , d )
104
- if err != nil {
105
- return nil , err
106
- }
107
-
108
- flights , err = convertFlightSchedulesToFlights (d , schedules )
109
- if err != nil {
110
- return nil , err
111
- }
98
+ func (a * cfsAction ) convertSingle (ctx context.Context , inputBucket , inputPrefix string , d common.LocalDate , ch chan <- * common.Flight ) error {
99
+ schedules , err := a .loadFlightSchedules (ctx , inputBucket , inputPrefix , d )
100
+ if err != nil {
101
+ return err
112
102
}
113
103
114
- return flights , nil
104
+ return convertFlightSchedulesToFlights ( ctx , d , schedules , ch )
115
105
}
116
106
117
107
func (a * cfsAction ) loadFlightSchedules (ctx context.Context , bucket , prefix string , d common.LocalDate ) ([]lufthansa.FlightSchedule , error ) {
@@ -137,20 +127,35 @@ func (a *cfsAction) upsertFlights(ctx context.Context, bucket, prefix string, d
137
127
return err
138
128
}
139
129
140
- added := make (map [common.FlightId ]struct {} )
130
+ added := make (map [common.FlightId ]* common. Flight )
141
131
result := make ([]* common.Flight , 0 , max (len (flights ), len (existing )))
142
132
143
133
for _ , f := range flights {
144
- if _ , ok := added [f .Id ()]; ! ok {
134
+ if addedFlight , ok := added [f .Id ()]; ok {
135
+ if err := combineFlights (addedFlight , f , queryDateRanges ); err != nil {
136
+ return err
137
+ }
138
+ } else {
145
139
result = append (result , f )
146
- added [f .Id ()] = struct {}{}
140
+ added [f .Id ()] = f
147
141
}
148
142
}
149
143
150
144
for _ , f := range existing {
151
- if _ , ok := added [f .Id ()]; ! ok && ! queryDateRanges .Contains (f .QueryDate ) {
152
- result = append (result , f )
153
- added [f .Id ()] = struct {}{}
145
+ if addedFlight , ok := added [f .Id ()]; ok {
146
+ if err := combineFlights (addedFlight , f , queryDateRanges ); err != nil {
147
+ return err
148
+ }
149
+ } else {
150
+ flightQueryDate , err := common .ParseLocalDate (f .DataElements [queryDateId ])
151
+ if err != nil {
152
+ return err
153
+ }
154
+
155
+ if ! queryDateRanges .Contains (flightQueryDate ) {
156
+ result = append (result , f )
157
+ added [f .Id ()] = f
158
+ }
154
159
}
155
160
}
156
161
@@ -189,15 +194,14 @@ func (a *cfsAction) loadFlights(ctx context.Context, bucket, s3Key string) ([]*c
189
194
return flights , json .NewDecoder (resp .Body ).Decode (& flights )
190
195
}
191
196
192
- func convertFlightSchedulesToFlights (queryDate common.LocalDate , schedules []lufthansa.FlightSchedule ) ([] * common.Flight , error ) {
197
+ func convertFlightSchedulesToFlights (ctx context. Context , queryDate common.LocalDate , schedules []lufthansa.FlightSchedule , ch chan <- * common.Flight ) error {
193
198
lookup := make (map [common.FlightId ]* common.Flight )
194
199
codeShareIds := make (map [common.FlightId ]struct {})
195
200
addLater := make (map [common.FlightId ][]* common.Flight )
196
201
197
202
for _ , fs := range schedules {
198
203
for _ , leg := range fs .Legs {
199
204
f := & common.Flight {
200
- QueryDate : queryDate ,
201
205
Airline : common .AirlineIdentifier (fs .Airline ),
202
206
FlightNumber : fs .FlightNumber ,
203
207
Suffix : fs .Suffix ,
@@ -214,18 +218,22 @@ func convertFlightSchedulesToFlights(queryDate common.LocalDate, schedules []luf
214
218
CodeShares : make (map [common.FlightNumber ]map [int ]string ),
215
219
}
216
220
221
+ f .DataElements [queryDateId ] = queryDate .String ()
222
+
217
223
lookup [f .Id ()] = f
218
224
219
225
if codeSharesRaw := f .DataElements [codeShareChildId ]; codeSharesRaw != "" {
220
226
// this flight has codeshares
221
227
for _ , codeShare := range strings .Split (codeSharesRaw , "/" ) {
222
228
codeShareFn , err := common .ParseFlightNumber (codeShare )
223
229
if err != nil {
224
- return nil , err
230
+ return err
225
231
}
226
232
227
233
if _ , ok := f .CodeShares [codeShareFn ]; ! ok {
228
- f .CodeShares [codeShareFn ] = make (map [int ]string )
234
+ f .CodeShares [codeShareFn ] = map [int ]string {
235
+ queryDateId : queryDate .String (),
236
+ }
229
237
}
230
238
231
239
// mark as codeshare
@@ -237,7 +245,7 @@ func convertFlightSchedulesToFlights(queryDate common.LocalDate, schedules []luf
237
245
// this flight is a codeshare
238
246
parentFn , err := common .ParseFlightNumber (codeShare )
239
247
if err != nil {
240
- return nil , err
248
+ return err
241
249
}
242
250
243
251
parentFid := parentFn .Id (f .Departure ())
@@ -265,7 +273,6 @@ func convertFlightSchedulesToFlights(queryDate common.LocalDate, schedules []luf
265
273
// create a parent if the parent itself isn't present
266
274
first := codeShares [0 ]
267
275
f = & common.Flight {
268
- QueryDate : queryDate ,
269
276
Airline : fid .Number .Airline ,
270
277
FlightNumber : fid .Number .Number ,
271
278
Suffix : fid .Number .Suffix ,
@@ -278,8 +285,10 @@ func convertFlightSchedulesToFlights(queryDate common.LocalDate, schedules []luf
278
285
AircraftType : first .AircraftType ,
279
286
AircraftConfigurationVersion : first .AircraftConfigurationVersion ,
280
287
Registration : first .Registration ,
281
- DataElements : make (map [int ]string ),
282
- CodeShares : make (map [common.FlightNumber ]map [int ]string ),
288
+ DataElements : map [int ]string {
289
+ queryDateId : queryDate .String (),
290
+ },
291
+ CodeShares : make (map [common.FlightNumber ]map [int ]string ),
283
292
}
284
293
285
294
lookup [fid ] = f
@@ -290,23 +299,51 @@ func convertFlightSchedulesToFlights(queryDate common.LocalDate, schedules []luf
290
299
}
291
300
}
292
301
293
- flights := make ([]* common.Flight , 0 , len (lookup )- len (codeShareIds ))
294
302
for fid , f := range lookup {
295
303
if _ , ok := codeShareIds [fid ]; ! ok {
296
- flights = append (flights , f )
304
+ select {
305
+ case ch <- f :
306
+ case <- ctx .Done ():
307
+ return ctx .Err ()
308
+ }
297
309
}
298
310
}
299
311
300
- return flights , nil
312
+ return nil
301
313
}
302
314
303
- func groupByDepartureDateUTC (results [][]* common.Flight ) map [common.LocalDate ][]* common.Flight {
304
- grouped := make (map [common.LocalDate ][]* common.Flight )
305
- for _ , result := range results {
306
- for _ , f := range result {
307
- grouped [f .DepartureDate ()] = append (grouped [f .DepartureDate ()], f )
315
+ func combineFlights (f , other * common.Flight , queryDateRanges common.LocalDateRanges ) error {
316
+ otherQueryDate , err := common .ParseLocalDate (other .DataElements [queryDateId ])
317
+ if err != nil {
318
+ return err
319
+ }
320
+
321
+ if ! queryDateRanges .Contains (otherQueryDate ) {
322
+ for k , v := range other .DataElements {
323
+ if _ , ok := f .DataElements [k ]; ! ok {
324
+ f .DataElements [k ] = v
325
+ }
326
+ }
327
+ }
328
+
329
+ for codeShareFn , otherDataElements := range other .CodeShares {
330
+ codeShareQueryDate , err := common .ParseLocalDate (otherDataElements [queryDateId ])
331
+ if err != nil {
332
+ return err
333
+ }
334
+
335
+ if ! queryDateRanges .Contains (codeShareQueryDate ) {
336
+ if dataElements , ok := f .CodeShares [codeShareFn ]; ok {
337
+ for k , v := range otherDataElements {
338
+ if _ , ok := dataElements [k ]; ! ok {
339
+ dataElements [k ] = v
340
+ }
341
+ }
342
+ } else {
343
+ f .CodeShares [codeShareFn ] = otherDataElements
344
+ }
308
345
}
309
346
}
310
347
311
- return grouped
348
+ return nil
312
349
}
0 commit comments