Skip to content

Commit

Permalink
fix context cancellation issue
Browse files Browse the repository at this point in the history
  • Loading branch information
its-felix committed Aug 24, 2024
1 parent f59267c commit 3973ae0
Showing 1 changed file with 35 additions and 19 deletions.
54 changes: 35 additions & 19 deletions go/cron/action/convert_flight_schedules.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/explore-flights/monorepo/go/common"
Expand Down Expand Up @@ -42,12 +43,21 @@ func (a *cfsAction) Handle(ctx context.Context, params ConvertFlightSchedulesPar
ctx, cancel := context.WithCancel(ctx)
defer cancel()

flightsByDepartureDateUTC, err := a.convertAll(ctx, params.InputBucket, params.InputPrefix, params.DateRanges)
if err != nil {
return ConvertFlightSchedulesOutput{}, err
}

return ConvertFlightSchedulesOutput{}, a.upsertAll(ctx, params.OutputBucket, params.OutputPrefix, params.DateRanges, flightsByDepartureDateUTC)
}

func (a *cfsAction) convertAll(ctx context.Context, inputBucket, inputPrefix string, dateRanges common.LocalDateRanges) (map[common.LocalDate][]*common.Flight, error) {
ch := make(chan *common.Flight, 1024)
g, ctx := errgroup.WithContext(ctx)
g, gCtx := errgroup.WithContext(ctx)

for _, r := range params.DateRanges {
for _, r := range dateRanges {
g.Go(func() error {
return a.convertRange(ctx, params.InputBucket, params.InputPrefix, r[0], r[1], ch)
return a.convertRange(gCtx, inputBucket, inputPrefix, r[0], r[1], ch)
})
}

Expand All @@ -64,24 +74,10 @@ func (a *cfsAction) Handle(ctx context.Context, params ConvertFlightSchedulesPar
}()

if err := func() error { defer close(ch); return g.Wait() }(); err != nil {
return ConvertFlightSchedulesOutput{}, err
}

g, ctx = errgroup.WithContext(ctx)
for d, flights := range <-done {
g.Go(func() error {
return a.upsertFlights(
ctx,
params.OutputBucket,
params.OutputPrefix,
d,
params.DateRanges,
flights,
)
})
return nil, err
}

return ConvertFlightSchedulesOutput{}, g.Wait()
return <-done, nil
}

func (a *cfsAction) convertRange(ctx context.Context, inputBucket, inputPrefix string, start, end common.LocalDate, ch chan<- *common.Flight) error {
Expand Down Expand Up @@ -120,6 +116,24 @@ func (a *cfsAction) loadFlightSchedules(ctx context.Context, bucket, prefix stri
return schedules, json.NewDecoder(resp.Body).Decode(&schedules)
}

func (a *cfsAction) upsertAll(ctx context.Context, bucket, prefix string, queryDateRanges common.LocalDateRanges, flightsByDepartureDateUTC map[common.LocalDate][]*common.Flight) error {
g, ctx := errgroup.WithContext(ctx)
for d, flights := range flightsByDepartureDateUTC {
g.Go(func() error {
return a.upsertFlights(
ctx,
bucket,
prefix,
d,
queryDateRanges,
flights,
)
})
}

return g.Wait()
}

func (a *cfsAction) upsertFlights(ctx context.Context, bucket, prefix string, d common.LocalDate, queryDateRanges common.LocalDateRanges, flights []*common.Flight) error {
s3Key := prefix + d.Time(nil).Format("2006/01/02") + ".json"
existing, err := a.loadFlights(ctx, bucket, s3Key)
Expand Down Expand Up @@ -181,6 +195,8 @@ func (a *cfsAction) loadFlights(ctx context.Context, bucket, s3Key string) ([]*c
})

if err != nil {
fmt.Println("err during loadFlights")

if adapt.IsS3NotFound(err) {
return nil, nil
} else {
Expand Down

0 comments on commit 3973ae0

Please sign in to comment.