Skip to content

Commit

Permalink
change update metadata action to use schedules
Browse files Browse the repository at this point in the history
  • Loading branch information
its-felix committed Nov 7, 2024
1 parent f4781b8 commit cb06578
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 64 deletions.
3 changes: 1 addition & 2 deletions cdk/lib/constructs/sfn-construct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,9 @@ export class SfnConstruct extends Construct {
'action': 'update_metadata',
'params': {
'inputBucket': props.dataBucket.bucketName,
'inputPrefix': PROCESSED_FLIGHTS_PREFIX,
'inputPrefix': PROCESSED_SCHEDULES_PREFIX,
'outputBucket': props.dataBucket.bucketName,
'outputPrefix': PROCESSED_METADATA_PREFIX,
'dateRanges': JsonPath.objectAt('$.convertSchedulesResponse.dateRanges'),
},
}),
payloadResponseOnly: true,
Expand Down
6 changes: 6 additions & 0 deletions go/common/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ func (s Set[T]) Contains(value T) bool {
_, ok := s[value]
return ok
}

func (s Set[T]) Remove(value T) bool {
_, ok := s[value]
delete(s, value)
return ok
}
1 change: 1 addition & 0 deletions go/cron/action/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ type Action[IN any, OUT any] interface {
type MinimalS3Client interface {
adapt.S3Getter
adapt.S3Putter
adapt.S3Lister
}
180 changes: 118 additions & 62 deletions go/cron/action/update_metadata.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package action

import (
"compress/gzip"
"context"
"encoding/json"
"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -9,32 +10,34 @@ import (
"github.com/explore-flights/monorepo/go/common/adapt"
"github.com/explore-flights/monorepo/go/common/concurrent"
"github.com/explore-flights/monorepo/go/common/xiter"
"github.com/explore-flights/monorepo/go/common/xtime"
"golang.org/x/sync/errgroup"
"maps"
"slices"
)

type UpdateMetadataParams struct {
InputBucket string `json:"inputBucket"`
InputPrefix string `json:"inputPrefix"`
OutputBucket string `json:"outputBucket"`
OutputPrefix string `json:"outputPrefix"`
DateRanges xtime.LocalDateRanges `json:"dateRanges"`
InputBucket string `json:"inputBucket"`
InputPrefix string `json:"inputPrefix"`
OutputBucket string `json:"outputBucket"`
OutputPrefix string `json:"outputPrefix"`
}

type UpdateMetadataOutput struct {
NewAirports int `json:"newAirports"`
NewAirlines int `json:"newAirlines"`
NewFlightNumbers int `json:"newFlightNumbers"`
NewAircraft int `json:"newAircraft"`
Airports AddedAndRemoved `json:"airports"`
Airlines AddedAndRemoved `json:"airlines"`
FlightNumbers AddedAndRemoved `json:"flightNumbers"`
Aircraft AddedAndRemoved `json:"aircraft"`
}

type AddedAndRemoved struct {
Added int `json:"added"`
Removed int `json:"removed"`
}

type metadata struct {
airports map[string]struct{}
airlines map[common.AirlineIdentifier]struct{}
flightNumbers map[common.FlightNumber]struct{}
aircraft map[string]struct{}
airports common.Set[string]
airlines common.Set[common.AirlineIdentifier]
flightNumbers common.Set[common.FlightNumber]
aircraft common.Set[string]
}

type umdAction struct {
Expand All @@ -49,54 +52,85 @@ func (a *umdAction) Handle(ctx context.Context, params UpdateMetadataParams) (Up
ctx, cancel := context.WithCancel(ctx)
defer cancel()

wg := concurrent.WorkGroup[xtime.LocalDate, metadata, metadata]{
files, err := a.listScheduleFiles(ctx, params.InputBucket, params.InputPrefix)
if err != nil {
return UpdateMetadataOutput{}, err
}

wg := concurrent.WorkGroup[[2]string, metadata, metadata]{
Parallelism: 10,
Worker: a.worker(params.InputBucket, params.InputPrefix),
Worker: a.worker,
Combiner: a.combiner,
Finisher: a.finisher,
}

md, err := wg.RunSeq(ctx, params.DateRanges.Iter())
md, err := wg.RunSeq(ctx, xiter.All(files))
if err != nil {
return UpdateMetadataOutput{}, err
}

return a.upsertMetadata(ctx, params.OutputBucket, params.OutputPrefix, md)
return a.updateMetadata(ctx, params.OutputBucket, params.OutputPrefix, md)
}

func (a *umdAction) worker(bucket, prefix string) func(ctx context.Context, d xtime.LocalDate, md metadata) (metadata, error) {
return func(ctx context.Context, d xtime.LocalDate, md metadata) (metadata, error) {
resp, err := a.s3c.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(prefix + d.Time(nil).Format("2006/01/02") + ".json"),
})
func (a *umdAction) listScheduleFiles(ctx context.Context, bucket, prefix string) ([][2]string, error) {
files := make([][2]string, 0)

paginator := s3.NewListObjectsV2Paginator(a.s3c, &s3.ListObjectsV2Input{
Bucket: aws.String(bucket),
Prefix: aws.String(prefix),
})

for paginator.HasMorePages() {
resp, err := paginator.NextPage(ctx)
if err != nil {
return md, err
return nil, err
}

defer resp.Body.Close()

var flights []*common.Flight
if err = json.NewDecoder(resp.Body).Decode(&flights); err != nil {
return md, err
for _, obj := range resp.Contents {
files = append(files, [2]string{bucket, *obj.Key})
}
}

md = a.ensureMetadata(md)
for _, f := range flights {
md.airports[f.DepartureAirport] = struct{}{}
md.airports[f.ArrivalAirport] = struct{}{}
return files, nil
}

for fn := range xiter.Combine(xiter.Single(f.Number()), maps.Keys(f.CodeShares)) {
md.airlines[fn.Airline] = struct{}{}
md.flightNumbers[fn] = struct{}{}
}
func (a *umdAction) worker(ctx context.Context, obj [2]string, md metadata) (metadata, error) {
resp, err := a.s3c.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(obj[0]),
Key: aws.String(obj[1]),
})

md.aircraft[f.AircraftType] = struct{}{}
}
if err != nil {
return md, err
}

defer resp.Body.Close()

return md, nil
r, err := gzip.NewReader(resp.Body)
if err != nil {
return md, err
}

defer r.Close()

var schedules map[common.FlightNumber]*common.FlightSchedule
if err = json.NewDecoder(r).Decode(&schedules); err != nil {
return md, err
}

md = a.ensureMetadata(md)
for fn, fs := range schedules {
md.airlines[fn.Airline] = struct{}{}
md.flightNumbers[fn] = struct{}{}

for _, variant := range fs.Variants {
md.airports[variant.Data.DepartureAirport] = struct{}{}
md.airports[variant.Data.ArrivalAirport] = struct{}{}
md.aircraft[variant.Data.AircraftType] = struct{}{}
}
}

return md, nil
}

func (a *umdAction) combiner(ctx context.Context, first, second metadata) (metadata, error) {
Expand Down Expand Up @@ -127,73 +161,95 @@ func (a *umdAction) finisher(ctx context.Context, md metadata) (metadata, error)

func (*umdAction) ensureMetadata(md metadata) metadata {
if md.airports == nil {
md.airports = make(map[string]struct{})
md.airports = make(common.Set[string])
}

if md.airlines == nil {
md.airlines = make(map[common.AirlineIdentifier]struct{})
md.airlines = make(common.Set[common.AirlineIdentifier])
}

if md.flightNumbers == nil {
md.flightNumbers = make(map[common.FlightNumber]struct{})
md.flightNumbers = make(common.Set[common.FlightNumber])
}

if md.aircraft == nil {
md.aircraft = make(map[string]struct{})
md.aircraft = make(common.Set[string])
}

return md
}

func (a *umdAction) upsertMetadata(ctx context.Context, bucket, prefix string, md metadata) (UpdateMetadataOutput, error) {
func (a *umdAction) updateMetadata(ctx context.Context, bucket, prefix string, md metadata) (UpdateMetadataOutput, error) {
var output UpdateMetadataOutput

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
var err error
output.NewAirports, err = upsertMetadata(ctx, a.s3c, bucket, prefix, "airports", md.airports)
output.Airports.Added, output.Airports.Removed, err = updateMetadata(ctx, a.s3c, bucket, prefix, "airports", md.airports)
return err
})

g.Go(func() error {
var err error
output.NewAirlines, err = upsertMetadata(ctx, a.s3c, bucket, prefix, "airlines", md.airlines)
output.Airlines.Added, output.Airlines.Removed, err = updateMetadata(ctx, a.s3c, bucket, prefix, "airlines", md.airlines)
return err
})

g.Go(func() error {
var err error
output.NewFlightNumbers, err = upsertMetadata(ctx, a.s3c, bucket, prefix, "flightNumbers", md.flightNumbers)
output.FlightNumbers.Added, output.FlightNumbers.Removed, err = updateMetadata(ctx, a.s3c, bucket, prefix, "flightNumbers", md.flightNumbers)
return err
})

g.Go(func() error {
var err error
output.NewAircraft, err = upsertMetadata(ctx, a.s3c, bucket, prefix, "aircraft", md.aircraft)
output.Aircraft.Added, output.Aircraft.Removed, err = updateMetadata(ctx, a.s3c, bucket, prefix, "aircraft", md.aircraft)
return err
})

return output, g.Wait()
}

func upsertMetadata[T comparable](ctx context.Context, s3c MinimalS3Client, bucket, prefix, name string, newValues map[T]struct{}) (int, error) {
func updateMetadata[T comparable](ctx context.Context, s3c MinimalS3Client, bucket, prefix, name string, newValues common.Set[T]) (int, int, error) {
key := prefix + name + ".json"
existingValues, err := loadExisting[T](ctx, s3c, bucket, key)
if err != nil {
return 0, 0, err
}

var existingValues []T
err := adapt.S3GetJson(ctx, s3c, bucket, key, &existingValues)
result := make([]T, 0, len(newValues))
added := 0

if err != nil && !adapt.IsS3NotFound(err) {
return 0, err
for value := range newValues {
result = append(result, value)

if !existingValues.Remove(value) {
// added are all those that were not present before
added += 1
}
}

newValueCount := len(newValues)
for _, v := range existingValues {
if _, ok := newValues[v]; ok {
newValueCount--
// removed are all those that are not present anymore
removed := len(existingValues)

return added, removed, adapt.S3PutJson(ctx, s3c, bucket, key, result)
}

func loadExisting[T comparable](ctx context.Context, s3c MinimalS3Client, bucket, key string) (common.Set[T], error) {
var existingValues []T
err := adapt.S3GetJson(ctx, s3c, bucket, key, &existingValues)
if err != nil {
if adapt.IsS3NotFound(err) {
return make(common.Set[T]), nil
} else {
newValues[v] = struct{}{}
return nil, err
}
}

return newValueCount, adapt.S3PutJson(ctx, s3c, bucket, key, slices.AppendSeq(make([]T, 0, len(newValues)), maps.Keys(newValues)))
values := make(common.Set[T])
for _, v := range existingValues {
values[v] = struct{}{}
}

return values, nil
}

0 comments on commit cb06578

Please sign in to comment.