Skip to content

Commit

Permalink
load schedules in full instead of streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
its-felix committed Nov 7, 2024
1 parent fc4e69c commit e3efbf5
Showing 1 changed file with 45 additions and 60 deletions.
105 changes: 45 additions & 60 deletions go/api/data/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,25 +346,16 @@ func (h *Handler) Aircraft(ctx context.Context) ([]Aircraft, error) {
}

func (h *Handler) FlightSchedule(ctx context.Context, fn common.FlightNumber) (*common.FlightSchedule, error) {
var fs *common.FlightSchedule
return fs, h.flightSchedules(ctx, fn.Airline, func(seq iter.Seq[jstream.KV]) error {
for kv := range seq {
if kv.Key == fn.String() {
b, err := json.Marshal(kv.Value)
if err != nil {
return err
}

if err = json.Unmarshal(b, &fs); err != nil {
return err
}
schedules, err := h.flightSchedulesFull(ctx, fn.Airline)
if err != nil {
return nil, err
}

return nil
}
}
if schedules == nil {
return nil, nil
}

return nil
})
return schedules[fn], nil
}

func (h *Handler) Flight(ctx context.Context, fn common.FlightNumber, departureDateUTC xtime.LocalDate, departureAirport string, allowCodeShare bool) (*common.Flight, time.Time, error) {
Expand Down Expand Up @@ -512,22 +503,15 @@ func (h *Handler) seatMapS3Key(airline common.AirlineIdentifier, aircraftType, a
}

func (h *Handler) QuerySchedules(ctx context.Context, airline common.AirlineIdentifier, aircraftType, aircraftConfigurationVersion string) (map[common.FlightNumber][]RouteAndRange, error) {
result := make(map[common.FlightNumber][]RouteAndRange)
return result, h.flightSchedules(ctx, airline, func(seq iter.Seq[jstream.KV]) error {
for kv := range seq {
b, err := json.Marshal(kv.Value)
if err != nil {
return err
}

var fs *common.FlightSchedule
if err = json.Unmarshal(b, &fs); err != nil {
return err
}
schedules, err := h.flightSchedulesFull(ctx, airline)
if err != nil {
return nil, err
}

result := make(map[common.FlightNumber][]RouteAndRange)
if schedules != nil {
for fn, fs := range schedules {
for _, variant := range fs.Variants {
fn := fs.Number()

if variant.Data.ServiceType == "J" && variant.Data.AircraftType == aircraftType && variant.Data.AircraftConfigurationVersion == aircraftConfigurationVersion && variant.Data.OperatedAs == fn {
if cnt, span := variant.Ranges.Span(); cnt > 0 {
idx := slices.IndexFunc(result[fn], func(rr RouteAndRange) bool {
Expand All @@ -553,38 +537,12 @@ func (h *Handler) QuerySchedules(ctx context.Context, airline common.AirlineIden
}
}
}
}

return nil
})
}

func (h *Handler) FlightSchedules(ctx context.Context, airline common.AirlineIdentifier, fn func(seq iter.Seq[*common.FlightSchedule]) error) error {
return h.flightSchedules(ctx, airline, func(seq iter.Seq[jstream.KV]) error {
var internalErr error
err := fn(func(yield func(*common.FlightSchedule) bool) {
for kv := range seq {
var b []byte
b, internalErr = json.Marshal(kv.Value)
if internalErr != nil {
return
}

var fs *common.FlightSchedule
if internalErr = json.Unmarshal(b, &fs); internalErr != nil {
return
}

if !yield(fs) {
return
}
}
})

return errors.Join(internalErr, err)
})
return result, nil
}

func (h *Handler) flightSchedules(ctx context.Context, airline common.AirlineIdentifier, fn func(seq iter.Seq[jstream.KV]) error) error {
func (h *Handler) flightSchedulesStream(ctx context.Context, airline common.AirlineIdentifier, fn func(seq iter.Seq[jstream.KV]) error) error {
resp, err := h.s3c.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(h.bucket),
Key: aws.String(fmt.Sprintf("processed/schedules/%s.json.gz", airline)),
Expand Down Expand Up @@ -625,6 +583,33 @@ func (h *Handler) flightSchedules(ctx context.Context, airline common.AirlineIde
return nil
}

func (h *Handler) flightSchedulesFull(ctx context.Context, airline common.AirlineIdentifier) (map[common.FlightNumber]*common.FlightSchedule, error) {
resp, err := h.s3c.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(h.bucket),
Key: aws.String(fmt.Sprintf("processed/schedules/%s.json.gz", airline)),
})

if err != nil {
if adapt.IsS3NotFound(err) {
return nil, nil
} else {
return nil, err
}
}

defer resp.Body.Close()

r, err := gzip.NewReader(resp.Body)
if err != nil {
return nil, err
}

defer r.Close()

var schedules map[common.FlightNumber]*common.FlightSchedule
return schedules, json.NewDecoder(r).Decode(&schedules)
}

func (h *Handler) loadCsv(ctx context.Context, name string) (*csvReader, error) {
resp, err := h.s3c.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(h.bucket),
Expand Down

0 comments on commit e3efbf5

Please sign in to comment.