diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index aa7483c..b2620ab 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -68,7 +68,8 @@ jobs: GOOS: 'linux' GOARCH: 'arm64' # keep this in sync with the arch configured in CDK! CGO_ENABLED: '0' - run: 'go build -o bootstrap -tags "lambda.norpc"' + run: | + go build -o bootstrap -tags "lambda,lambda.norpc" - name: 'Store cron artifact' uses: actions/upload-artifact@v4 with: diff --git a/go/api/auth/repo.go b/go/api/auth/repo.go index 44fdb24..b17e5d5 100644 --- a/go/api/auth/repo.go +++ b/go/api/auth/repo.go @@ -4,7 +4,7 @@ import ( "context" "errors" "fmt" - "github.com/explore-flights/monorepo/go/api/adapt" + "github.com/explore-flights/monorepo/go/common/adapt" "github.com/gofrs/uuid/v5" "io" "net/url" diff --git a/go/api/data/handler.go b/go/api/data/handler.go index 77f5c0a..583ea88 100644 --- a/go/api/data/handler.go +++ b/go/api/data/handler.go @@ -8,8 +8,8 @@ import ( "fmt" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/explore-flights/monorepo/go/api/adapt" "github.com/explore-flights/monorepo/go/common" + "github.com/explore-flights/monorepo/go/common/adapt" "github.com/explore-flights/monorepo/go/common/lufthansa" "io" "slices" diff --git a/go/api/go.mod b/go/api/go.mod index 79ef162..5b5f40c 100644 --- a/go/api/go.mod +++ b/go/api/go.mod @@ -5,7 +5,7 @@ go 1.23 require ( github.com/aws/aws-sdk-go-v2 v1.30.4 github.com/aws/aws-sdk-go-v2/config v1.27.28 - github.com/aws/aws-sdk-go-v2/service/s3 v1.59.0 + github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1 github.com/aws/aws-sdk-go-v2/service/ssm v1.52.5 github.com/explore-flights/monorepo/go/common v0.0.0 github.com/goccy/go-graphviz v0.1.3 diff --git a/go/api/go.sum b/go/api/go.sum index 5953c60..dcc6464 100644 --- a/go/api/go.sum +++ b/go/api/go.sum @@ -24,8 +24,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18 h1:tJ5RnkHC github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18/go.mod h1:++NHzT+nAF7ZPrHPsA+ENvsXkOO8wEu+C6RXltAG4/c= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16 h1:jg16PhLPUiHIj8zYIW6bqzeQSuHVEiWnGA0Brz5Xv2I= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16/go.mod h1:Uyk1zE1VVdsHSU7096h/rwnXDzOzYQVl+FNPhPw7ShY= -github.com/aws/aws-sdk-go-v2/service/s3 v1.59.0 h1:Cso4Ev/XauMVsbwdhYEoxg8rxZWw43CFqqaPB5w3W2c= -github.com/aws/aws-sdk-go-v2/service/s3 v1.59.0/go.mod h1:BSPI0EfnYUuNHPS0uqIo5VrRwzie+Fp+YhQOUs16sKI= +github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1 h1:mx2ucgtv+MWzJesJY9Ig/8AFHgoE5FwLXwUVgW/FGdI= +github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1/go.mod h1:BSPI0EfnYUuNHPS0uqIo5VrRwzie+Fp+YhQOUs16sKI= github.com/aws/aws-sdk-go-v2/service/ssm v1.52.5 h1:eY1n+pyBbgqRBRnpVUg0QguAGMWVLQp2n+SfjjOJuQI= github.com/aws/aws-sdk-go-v2/service/ssm v1.52.5/go.mod h1:Bw2YSeqq/I4VyVs9JSfdT9ArqyAbQkJEwj13AVm0heg= github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 h1:zCsFCKvbj25i7p1u94imVoO447I/sFv8qq+lGJhRN0c= diff --git a/go/api/local/s3.go b/go/api/local/s3.go index 7a56dc5..98c9dfa 100644 --- a/go/api/local/s3.go +++ b/go/api/local/s3.go @@ -5,7 +5,7 @@ package local import ( "context" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/explore-flights/monorepo/go/api/adapt" + "github.com/explore-flights/monorepo/go/common/adapt" "io" "os" "path/filepath" diff --git a/go/api/search/export.go b/go/api/search/export.go index e478d6a..b8d9c90 100644 --- a/go/api/search/export.go +++ b/go/api/search/export.go @@ -79,9 +79,9 @@ func buildConnectionsResponse(conns []Connection, flights map[string]FlightRespo return r } -func convertCodeShares(inp []common.FlightNumber) []FlightNumberResponse { +func convertCodeShares(inp map[common.FlightNumberStr]map[int]string) []FlightNumberResponse { r := make([]FlightNumberResponse, 0, len(inp)) - for _, fn := range inp { + for fn := range inp { r = append(r, FlightNumberResponse{ Airline: string(fn.Airline), Number: fn.Number, diff --git a/go/api/search/repo.go b/go/api/search/repo.go index 4e9e04f..f2c99c3 100644 --- a/go/api/search/repo.go +++ b/go/api/search/repo.go @@ -5,8 +5,8 @@ import ( "encoding/json" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/explore-flights/monorepo/go/api/adapt" "github.com/explore-flights/monorepo/go/common" + "github.com/explore-flights/monorepo/go/common/adapt" "github.com/explore-flights/monorepo/go/common/concurrent" "golang.org/x/sync/errgroup" "sync" diff --git a/go/api/web/data.go b/go/api/web/data.go index 62db530..d6f71c4 100644 --- a/go/api/web/data.go +++ b/go/api/web/data.go @@ -3,9 +3,9 @@ package web import ( "context" "errors" - "github.com/explore-flights/monorepo/go/api/adapt" "github.com/explore-flights/monorepo/go/api/data" "github.com/explore-flights/monorepo/go/common" + "github.com/explore-flights/monorepo/go/common/adapt" "github.com/labstack/echo/v4" "net/http" ) diff --git a/go/api/adapt/s3.go b/go/common/adapt/s3.go similarity index 100% rename from go/api/adapt/s3.go rename to go/common/adapt/s3.go diff --git a/go/api/adapt/s3_lambda.go b/go/common/adapt/s3_lambda.go similarity index 100% rename from go/api/adapt/s3_lambda.go rename to go/common/adapt/s3_lambda.go diff --git a/go/api/adapt/s3_local.go b/go/common/adapt/s3_local.go similarity index 100% rename from go/api/adapt/s3_local.go rename to go/common/adapt/s3_local.go diff --git a/go/common/flight.go b/go/common/flight.go index 6323dda..2114785 100644 --- a/go/common/flight.go +++ b/go/common/flight.go @@ -1,6 +1,7 @@ package common import ( + "encoding/json" "fmt" "regexp" "strconv" @@ -49,26 +50,48 @@ func (f FlightNumber) Id(dep Departure) FlightId { } } +type FlightNumberStr FlightNumber + +func (f *FlightNumberStr) UnmarshalJSON(b []byte) error { + var s string + if err := json.Unmarshal(b, &s); err != nil { + return err + } + + fn, err := ParseFlightNumber(s) + if err != nil { + return err + } + + *f = FlightNumberStr(fn) + return nil +} + +func (f FlightNumberStr) MarshalJSON() ([]byte, error) { + return json.Marshal(FlightNumber(f).String()) +} + type FlightId struct { Number FlightNumber `json:"number"` Departure Departure `json:"departure"` } type Flight struct { - Airline AirlineIdentifier `json:"airline"` - FlightNumber int `json:"flightNumber"` - Suffix string `json:"suffix"` - DepartureTime time.Time `json:"departureTime"` - DepartureAirport string `json:"departureAirport"` - ArrivalTime time.Time `json:"arrivalTime"` - ArrivalAirport string `json:"arrivalAirport"` - ServiceType string `json:"serviceType"` - AircraftOwner AirlineIdentifier `json:"aircraftOwner"` - AircraftType string `json:"aircraftType"` - AircraftConfigurationVersion string `json:"aircraftConfigurationVersion"` - Registration string `json:"registration"` - DataElements map[int]string `json:"dataElements"` - CodeShares []FlightNumber `json:"codeShares"` + QueryDate LocalDate `json:"queryDate"` + Airline AirlineIdentifier `json:"airline"` + FlightNumber int `json:"flightNumber"` + Suffix string `json:"suffix"` + DepartureTime time.Time `json:"departureTime"` + DepartureAirport string `json:"departureAirport"` + ArrivalTime time.Time `json:"arrivalTime"` + ArrivalAirport string `json:"arrivalAirport"` + ServiceType string `json:"serviceType"` + AircraftOwner AirlineIdentifier `json:"aircraftOwner"` + AircraftType string `json:"aircraftType"` + AircraftConfigurationVersion string `json:"aircraftConfigurationVersion"` + Registration string `json:"registration"` + DataElements map[int]string `json:"dataElements"` + CodeShares map[FlightNumberStr]map[int]string `json:"codeShares"` } func (f *Flight) DepartureDate() LocalDate { diff --git a/go/common/flight_schedule.go b/go/common/flight_schedule.go new file mode 100644 index 0000000..e75c7e6 --- /dev/null +++ b/go/common/flight_schedule.go @@ -0,0 +1,59 @@ +package common + +import ( + "maps" +) + +type FlightScheduleData struct { + DepartureTime OffsetTime `json:"departureTime"` + DepartureAirport string `json:"departureAirport"` + ArrivalTime OffsetTime `json:"arrivalTime"` + ArrivalAirport string `json:"arrivalAirport"` + ServiceType string `json:"serviceType"` + AircraftOwner AirlineIdentifier `json:"aircraftOwner"` + AircraftType string `json:"aircraftType"` + AircraftConfigurationVersion string `json:"aircraftConfigurationVersion"` + Registration string `json:"registration"` + DataElements map[int]string `json:"dataElements"` + CodeShares []FlightNumber `json:"codeShares"` +} + +func (fsd FlightScheduleData) Equal(other FlightScheduleData) bool { + return fsd.DepartureTime == other.DepartureTime && + fsd.DepartureAirport == other.DepartureAirport && + fsd.ArrivalTime == other.ArrivalTime && + fsd.ArrivalAirport == other.ArrivalAirport && + fsd.ServiceType == other.ServiceType && + fsd.AircraftOwner == other.AircraftOwner && + fsd.AircraftType == other.AircraftType && + fsd.AircraftConfigurationVersion == other.AircraftConfigurationVersion && + fsd.Registration == other.Registration && + maps.Equal(fsd.DataElements, other.DataElements) && + SliceEqualContent(fsd.CodeShares, other.CodeShares) +} + +type FlightScheduleVariant struct { + Ranges []LocalDateRange `json:"ranges"` + Data FlightScheduleData `json:"data"` +} + +func (fsv *FlightScheduleVariant) Expand(d LocalDate) { + +} + +type FlightSchedule struct { + Airline AirlineIdentifier `json:"airline"` + FlightNumber int `json:"flightNumber"` + Suffix string `json:"suffix"` + Variants []*FlightScheduleVariant `json:"variants"` +} + +func (fs *FlightSchedule) DataVariant(fsd FlightScheduleData) *FlightScheduleVariant { + for _, variant := range fs.Variants { + if variant.Data.Equal(fsd) { + return variant + } + } + + return nil +} diff --git a/go/common/go.mod b/go/common/go.mod index 3d63030..bc49866 100644 --- a/go/common/go.mod +++ b/go/common/go.mod @@ -3,9 +3,22 @@ module github.com/explore-flights/monorepo/go/common go 1.23 require ( + github.com/aws/aws-sdk-go-v2 v1.30.4 + github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1 github.com/go-jose/go-jose/v4 v4.0.4 github.com/golang-jwt/jwt/v5 v5.2.1 golang.org/x/time v0.6.0 ) -require golang.org/x/crypto v0.26.0 // indirect +require ( + github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16 // indirect + github.com/aws/smithy-go v1.20.4 // indirect + golang.org/x/crypto v0.26.0 // indirect +) diff --git a/go/common/go.sum b/go/common/go.sum index c8a5da2..39e30cd 100644 --- a/go/common/go.sum +++ b/go/common/go.sum @@ -1,3 +1,25 @@ +github.com/aws/aws-sdk-go-v2 v1.30.4 h1:frhcagrVNrzmT95RJImMHgabt99vkXGslubDaDagTk8= +github.com/aws/aws-sdk-go-v2 v1.30.4/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4 h1:70PVAiL15/aBMh5LThwgXdSQorVr91L127ttckI9QQU= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.4/go.mod h1:/MQxMqci8tlqDH+pjmoLu1i0tbWCUP1hhyMRuFxpQCw= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 h1:TNyt/+X43KJ9IJJMjKfa3bNTiZbUP7DeCxfbTROESwY= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16/go.mod h1:2DwJF39FlNAUiX5pAc0UNeiz16lK2t7IaFcm0LFHEgc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 h1:jYfy8UPmd+6kJW5YhY0L1/KftReOGxI/4NtVSTh9O/I= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16/go.mod h1:7ZfEPZxkW42Afq4uQB8H2E2e6ebh6mXTueEpYzjCzcs= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16 h1:mimdLQkIX1zr8GIPY1ZtALdBQGxcASiBd2MOp8m/dMc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.16/go.mod h1:YHk6owoSwrIsok+cAH9PENCOGoH5PU2EllX4vLtSrsY= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 h1:KypMCbLPPHEmf9DgMGw51jMj77VfGPAN2Kv4cfhlfgI= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4/go.mod h1:Vz1JQXliGcQktFTN/LN6uGppAIRoLBR2bMvIMP0gOjc= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18 h1:GckUnpm4EJOAio1c8o25a+b3lVfwVzC9gnSBqiiNmZM= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.18/go.mod h1:Br6+bxfG33Dk3ynmkhsW2Z/t9D4+lRqdLDNCKi85w0U= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18 h1:tJ5RnkHCiSH0jyd6gROjlJtNwov0eGYNz8s8nFcR0jQ= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18/go.mod h1:++NHzT+nAF7ZPrHPsA+ENvsXkOO8wEu+C6RXltAG4/c= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16 h1:jg16PhLPUiHIj8zYIW6bqzeQSuHVEiWnGA0Brz5Xv2I= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16/go.mod h1:Uyk1zE1VVdsHSU7096h/rwnXDzOzYQVl+FNPhPw7ShY= +github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1 h1:mx2ucgtv+MWzJesJY9Ig/8AFHgoE5FwLXwUVgW/FGdI= +github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1/go.mod h1:BSPI0EfnYUuNHPS0uqIo5VrRwzie+Fp+YhQOUs16sKI= +github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4= +github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-jose/go-jose/v4 v4.0.4 h1:VsjPI33J0SB9vQM6PLmNjoHqMQNGPiZ0rHL7Ni7Q6/E= diff --git a/go/common/localdate.go b/go/common/localdate.go index 4105eb2..f4e69a7 100644 --- a/go/common/localdate.go +++ b/go/common/localdate.go @@ -79,3 +79,25 @@ func (ld *LocalDate) UnmarshalJSON(data []byte) error { func (ld LocalDate) MarshalJSON() ([]byte, error) { return json.Marshal(ld.String()) } + +type LocalDateRange [2]LocalDate + +func (ldr LocalDateRange) Iter() iter.Seq[LocalDate] { + return ldr[0].Until(ldr[1]) +} + +func (ldr LocalDateRange) Contains(d LocalDate) bool { + return ldr[0].Compare(d) <= 0 && ldr[1].Compare(d) >= 0 +} + +type LocalDateRanges []LocalDateRange + +func (ldrs LocalDateRanges) Contains(d LocalDate) bool { + for _, ldr := range ldrs { + if ldr.Contains(d) { + return true + } + } + + return false +} diff --git a/go/common/time.go b/go/common/time.go new file mode 100644 index 0000000..48e981d --- /dev/null +++ b/go/common/time.go @@ -0,0 +1,79 @@ +package common + +import ( + "encoding/json" + "time" +) + +const offsetTimeFormat = "15:04:05Z07:00" + +type OffsetTime struct { + Hour int + Min int + Sec int + Loc *time.Location +} + +func NewOffsetTime(t time.Time) OffsetTime { + hour, minute, sec := t.Clock() + return OffsetTime{ + Hour: hour, + Min: minute, + Sec: sec, + Loc: t.Location(), + } +} + +func ParseOffsetTime(v string) (OffsetTime, error) { + t, err := time.Parse(offsetTimeFormat, v) + if err != nil { + return OffsetTime{}, err + } + + return OffsetTime{ + Hour: t.Hour(), + Min: t.Minute(), + Sec: t.Second(), + Loc: t.Location(), + }, nil +} + +func MustParseOffsetTime(v string) OffsetTime { + t, err := ParseOffsetTime(v) + if err != nil { + panic(err) + } + + return t +} + +func (t OffsetTime) Time(d LocalDate) time.Time { + return time.Date(d.Year, d.Month, d.Day, t.Hour, t.Min, t.Sec, 0, t.Loc) +} + +func (t OffsetTime) String() string { + return t.Time(LocalDate{}).Format(offsetTimeFormat) +} + +func (t *OffsetTime) UnmarshalJSON(data []byte) error { + var v string + if err := json.Unmarshal(data, &v); err != nil { + return err + } + + r, err := time.Parse(offsetTimeFormat, v) + if err != nil { + return err + } + + *t = NewOffsetTime(r) + return nil +} + +func (t OffsetTime) MarshalJSON() ([]byte, error) { + return json.Marshal(t.String()) +} + +func SplitTime(t time.Time) (LocalDate, OffsetTime) { + return NewLocalDate(t), NewOffsetTime(t) +} diff --git a/go/common/time_test.go b/go/common/time_test.go new file mode 100644 index 0000000..d4ff733 --- /dev/null +++ b/go/common/time_test.go @@ -0,0 +1,17 @@ +package common + +import ( + "testing" + "time" +) + +func TestSplitTime(t *testing.T) { + now := time.Now().Truncate(time.Second) + d, ot := SplitTime(now) + restored := ot.Time(d) + + if now != restored { + t.Fatalf("Restored time does not match the original time: %v != %v", now, restored) + return + } +} diff --git a/go/common/util.go b/go/common/util.go new file mode 100644 index 0000000..bb49b3b --- /dev/null +++ b/go/common/util.go @@ -0,0 +1,24 @@ +package common + +import "slices" + +func SliceEqualContent[S ~[]E, E comparable](s1, s2 S) bool { + if len(s1) != len(s2) { + return false + } + + s1 = slices.Clone(s1) + s2 = slices.Clone(s2) + + for len(s1) > 0 && len(s2) > 0 { + idx := slices.Index(s2, s1[0]) + if idx == -1 { + return false + } + + s1 = s1[1:] + s2 = slices.Delete(s2, idx, idx+1) + } + + return len(s1) == 0 && len(s2) == 0 +} diff --git a/go/common/util_test.go b/go/common/util_test.go new file mode 100644 index 0000000..9e20e5c --- /dev/null +++ b/go/common/util_test.go @@ -0,0 +1,12 @@ +package common + +import "testing" + +func TestSliceEqualContent(t *testing.T) { + s1 := []string{"a", "b", "c", "d"} + s2 := []string{"a", "d", "c", "b"} + + if !SliceEqualContent(s1, s2) { + t.Fatal("slices should be equal") + } +} diff --git a/go/cron/action/convert_flight_schedules.go b/go/cron/action/convert_flight_schedules.go index d32c83d..217751e 100644 --- a/go/cron/action/convert_flight_schedules.go +++ b/go/cron/action/convert_flight_schedules.go @@ -7,9 +7,9 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/explore-flights/monorepo/go/common" + "github.com/explore-flights/monorepo/go/common/adapt" "github.com/explore-flights/monorepo/go/common/lufthansa" "golang.org/x/sync/errgroup" - "slices" "strings" ) @@ -19,11 +19,11 @@ const ( ) type ConvertFlightSchedulesParams struct { - InputBucket string `json:"inputBucket"` - InputPrefix string `json:"inputPrefix"` - OutputBucket string `json:"outputBucket"` - OutputPrefix string `json:"outputPrefix"` - DateRanges [][2]common.LocalDate `json:"dateRanges"` + InputBucket string `json:"inputBucket"` + InputPrefix string `json:"inputPrefix"` + OutputBucket string `json:"outputBucket"` + OutputPrefix string `json:"outputPrefix"` + DateRanges common.LocalDateRanges `json:"dateRanges"` } type ConvertFlightSchedulesOutput struct { @@ -41,17 +41,40 @@ func (a *cfsAction) Handle(ctx context.Context, params ConvertFlightSchedulesPar ctx, cancel := context.WithCancel(ctx) defer cancel() + var grouped map[common.LocalDate][]*common.Flight + { + g, ctx := errgroup.WithContext(ctx) + results := make([][]*common.Flight, len(params.DateRanges)) + + for i, r := range params.DateRanges { + g.Go(func() error { + flights, err := a.convertRange(ctx, params.InputBucket, params.InputPrefix, r[0], r[1]) + if err != nil { + return err + } + + results[i] = flights + return nil + }) + } + + if err := g.Wait(); err != nil { + return ConvertFlightSchedulesOutput{}, err + } + + grouped = groupByDepartureDateUTC(results) + } + g, ctx := errgroup.WithContext(ctx) - for _, r := range params.DateRanges { + for d, flights := range grouped { g.Go(func() error { - return a.convertRange( + return a.upsertFlights( ctx, - params.InputBucket, - params.InputPrefix, params.OutputBucket, params.OutputPrefix, - r[0], - r[1], + d, + params.DateRanges, + flights, ) }) } @@ -59,31 +82,36 @@ func (a *cfsAction) Handle(ctx context.Context, params ConvertFlightSchedulesPar return ConvertFlightSchedulesOutput{}, g.Wait() } -func (a *cfsAction) convertRange(ctx context.Context, inputBucket, inputPrefix, outputBucket, outputPrefix string, start, end common.LocalDate) error { +func (a *cfsAction) convertRange(ctx context.Context, inputBucket, inputPrefix string, start, end common.LocalDate) ([]*common.Flight, error) { + var flights []*common.Flight + for curr := range start.Until(end) { - if err := a.convertSingle(ctx, inputBucket, inputPrefix, outputBucket, outputPrefix, curr); err != nil { - return err + converted, err := a.convertSingle(ctx, inputBucket, inputPrefix, curr) + if err != nil { + return nil, err } + + flights = append(flights, converted...) } - return nil + return flights, nil } -func (a *cfsAction) convertSingle(ctx context.Context, inputBucket, inputPrefix, outputBucket, outputPrefix string, d common.LocalDate) error { +func (a *cfsAction) convertSingle(ctx context.Context, inputBucket, inputPrefix string, d common.LocalDate) ([]*common.Flight, error) { var flights []*common.Flight { schedules, err := a.loadFlightSchedules(ctx, inputBucket, inputPrefix, d) if err != nil { - return err + return nil, err } - flights, err = convertFlightSchedulesToFlights(schedules) + flights, err = convertFlightSchedulesToFlights(d, schedules) if err != nil { - return err + return nil, err } } - return a.saveFlights(ctx, outputBucket, outputPrefix, d, flights) + return flights, nil } func (a *cfsAction) loadFlightSchedules(ctx context.Context, bucket, prefix string, d common.LocalDate) ([]lufthansa.FlightSchedule, error) { @@ -102,15 +130,36 @@ func (a *cfsAction) loadFlightSchedules(ctx context.Context, bucket, prefix stri return schedules, json.NewDecoder(resp.Body).Decode(&schedules) } -func (a *cfsAction) saveFlights(ctx context.Context, bucket, prefix string, d common.LocalDate, flights []*common.Flight) error { - b, err := json.Marshal(flights) +func (a *cfsAction) upsertFlights(ctx context.Context, bucket, prefix string, d common.LocalDate, queryDateRanges common.LocalDateRanges, flights []*common.Flight) error { + s3Key := prefix + d.Time(nil).Format("2006/01/02") + ".json" + existing, err := a.loadFlights(ctx, bucket, s3Key) + if err != nil { + return err + } + + added := make(map[common.FlightId]struct{}) + result := make([]*common.Flight, 0, max(len(flights), len(existing))) + + for _, f := range flights { + result = append(result, f) + added[f.Id()] = struct{}{} + } + + for _, f := range existing { + if _, ok := added[f.Id()]; !ok && !queryDateRanges.Contains(f.QueryDate) { + result = append(result, f) + added[f.Id()] = struct{}{} + } + } + + b, err := json.Marshal(result) if err != nil { return err } _, err = a.s3c.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(bucket), - Key: aws.String(prefix + d.Time(nil).Format("2006/01/02") + ".json"), + Key: aws.String(s3Key), ContentType: aws.String("application/json"), Body: bytes.NewReader(b), }) @@ -118,14 +167,35 @@ func (a *cfsAction) saveFlights(ctx context.Context, bucket, prefix string, d co return err } -func convertFlightSchedulesToFlights(schedules []lufthansa.FlightSchedule) ([]*common.Flight, error) { - flights := make([]*common.Flight, 0, len(schedules)) +func (a *cfsAction) loadFlights(ctx context.Context, bucket, s3Key string) ([]*common.Flight, error) { + resp, err := a.s3c.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(s3Key), + }) + + if err != nil { + if adapt.IsS3NotFound(err) { + return nil, nil + } else { + return nil, err + } + } + + defer resp.Body.Close() + + var flights []*common.Flight + return flights, json.NewDecoder(resp.Body).Decode(&flights) +} + +func convertFlightSchedulesToFlights(queryDate common.LocalDate, schedules []lufthansa.FlightSchedule) ([]*common.Flight, error) { lookup := make(map[common.FlightId]*common.Flight) + codeShareIds := make(map[common.FlightId]struct{}) addLater := make(map[common.FlightId][]*common.Flight) for _, fs := range schedules { for _, leg := range fs.Legs { f := &common.Flight{ + QueryDate: queryDate, Airline: common.AirlineIdentifier(fs.Airline), FlightNumber: fs.FlightNumber, Suffix: fs.Suffix, @@ -139,89 +209,102 @@ func convertFlightSchedulesToFlights(schedules []lufthansa.FlightSchedule) ([]*c AircraftConfigurationVersion: leg.AircraftConfigurationVersion, Registration: leg.Registration, DataElements: fs.DataElementsForSequence(leg.SequenceNumber), - CodeShares: make([]common.FlightNumber, 0), + CodeShares: make(map[common.FlightNumberStr]map[int]string), } - for _, codeShare := range strings.Split(f.DataElements[codeShareChildId], "/") { - if codeShare != "" { - fn, err := common.ParseFlightNumber(codeShare) + lookup[f.Id()] = f + + if codeSharesRaw := f.DataElements[codeShareChildId]; codeSharesRaw != "" { + // this flight has codeshares + for _, codeShare := range strings.Split(codeSharesRaw, "/") { + codeShareFn, err := common.ParseFlightNumber(codeShare) if err != nil { return nil, err } - f.CodeShares = appendUnique(f.CodeShares, fn) + if _, ok := f.CodeShares[common.FlightNumberStr(codeShareFn)]; !ok { + f.CodeShares[common.FlightNumberStr(codeShareFn)] = make(map[int]string) + } + + // mark as codeshare + codeShareIds[codeShareFn.Id(f.Departure())] = struct{}{} } } - lookup[f.Id()] = f - if codeShare := f.DataElements[codeShareParentId]; codeShare != "" { - fn, err := common.ParseFlightNumber(codeShare) + // this flight is a codeshare + parentFn, err := common.ParseFlightNumber(codeShare) if err != nil { return nil, err } - fid := common.FlightId{ - Number: fn, - Departure: f.Departure(), - } + parentFid := parentFn.Id(f.Departure()) - if parent, ok := lookup[fid]; ok { - parent.CodeShares = appendUnique(parent.CodeShares, f.Number()) + if parent, ok := lookup[parentFid]; ok { + parent.CodeShares[common.FlightNumberStr(f.Number())] = f.DataElements } else { - addLater[fid] = append(addLater[fid], f) + addLater[parentFid] = append(addLater[parentFid], f) } - } else { - flights = append(flights, f) - - if codeShares, ok := addLater[f.Id()]; ok { - for _, child := range codeShares { - f.CodeShares = appendUnique(f.CodeShares, child.Number()) - } - delete(addLater, f.Id()) - } + // mark self as codeshare + codeShareIds[f.Id()] = struct{}{} } } } + // add codeshares to parent for fid, codeShares := range addLater { if len(codeShares) < 1 { continue } - first := codeShares[0] - f := &common.Flight{ - Airline: fid.Number.Airline, - FlightNumber: fid.Number.Number, - Suffix: fid.Number.Suffix, - DepartureTime: first.DepartureTime, - DepartureAirport: first.DepartureAirport, - ArrivalTime: first.ArrivalTime, - ArrivalAirport: first.ArrivalAirport, - ServiceType: first.ServiceType, - AircraftOwner: first.AircraftOwner, - AircraftType: first.AircraftType, - AircraftConfigurationVersion: first.AircraftConfigurationVersion, - Registration: first.Registration, - DataElements: first.DataElements, - CodeShares: make([]common.FlightNumber, 0), + f, ok := lookup[fid] + if !ok { + // create a parent if the parent itself isn't present + first := codeShares[0] + f = &common.Flight{ + QueryDate: queryDate, + Airline: fid.Number.Airline, + FlightNumber: fid.Number.Number, + Suffix: fid.Number.Suffix, + DepartureTime: first.DepartureTime, + DepartureAirport: first.DepartureAirport, + ArrivalTime: first.ArrivalTime, + ArrivalAirport: first.ArrivalAirport, + ServiceType: first.ServiceType, + AircraftOwner: first.AircraftOwner, + AircraftType: first.AircraftType, + AircraftConfigurationVersion: first.AircraftConfigurationVersion, + Registration: first.Registration, + DataElements: make(map[int]string), + CodeShares: make(map[common.FlightNumberStr]map[int]string), + } + + lookup[fid] = f } for _, child := range codeShares { - f.CodeShares = appendUnique(f.CodeShares, child.Number()) + f.CodeShares[common.FlightNumberStr(child.Number())] = child.DataElements } + } - flights = append(flights, f) + flights := make([]*common.Flight, 0, len(lookup)-len(codeShareIds)) + for fid, f := range lookup { + if _, ok := codeShareIds[fid]; !ok { + flights = append(flights, f) + } } return flights, nil } -func appendUnique[T comparable](s []T, v T) []T { - if !slices.Contains(s, v) { - s = append(s, v) +func groupByDepartureDateUTC(results [][]*common.Flight) map[common.LocalDate][]*common.Flight { + grouped := make(map[common.LocalDate][]*common.Flight) + for _, result := range results { + for _, f := range result { + grouped[f.DepartureDate()] = append(grouped[f.DepartureDate()], f) + } } - return s + return grouped } diff --git a/go/cron/go.mod b/go/cron/go.mod index 4852241..f475919 100644 --- a/go/cron/go.mod +++ b/go/cron/go.mod @@ -6,7 +6,7 @@ require ( github.com/aws/aws-lambda-go v1.47.0 github.com/aws/aws-sdk-go-v2 v1.30.4 github.com/aws/aws-sdk-go-v2/config v1.27.28 - github.com/aws/aws-sdk-go-v2/service/s3 v1.59.0 + github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1 github.com/explore-flights/monorepo/go/common v0.0.0 golang.org/x/sync v0.8.0 golang.org/x/time v0.6.0 diff --git a/go/cron/go.sum b/go/cron/go.sum index 60a23b6..25ebbfe 100644 --- a/go/cron/go.sum +++ b/go/cron/go.sum @@ -26,8 +26,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18 h1:tJ5RnkHC github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18/go.mod h1:++NHzT+nAF7ZPrHPsA+ENvsXkOO8wEu+C6RXltAG4/c= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16 h1:jg16PhLPUiHIj8zYIW6bqzeQSuHVEiWnGA0Brz5Xv2I= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.16/go.mod h1:Uyk1zE1VVdsHSU7096h/rwnXDzOzYQVl+FNPhPw7ShY= -github.com/aws/aws-sdk-go-v2/service/s3 v1.59.0 h1:Cso4Ev/XauMVsbwdhYEoxg8rxZWw43CFqqaPB5w3W2c= -github.com/aws/aws-sdk-go-v2/service/s3 v1.59.0/go.mod h1:BSPI0EfnYUuNHPS0uqIo5VrRwzie+Fp+YhQOUs16sKI= +github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1 h1:mx2ucgtv+MWzJesJY9Ig/8AFHgoE5FwLXwUVgW/FGdI= +github.com/aws/aws-sdk-go-v2/service/s3 v1.60.1/go.mod h1:BSPI0EfnYUuNHPS0uqIo5VrRwzie+Fp+YhQOUs16sKI= github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 h1:zCsFCKvbj25i7p1u94imVoO447I/sFv8qq+lGJhRN0c= github.com/aws/aws-sdk-go-v2/service/sso v1.22.5/go.mod h1:ZeDX1SnKsVlejeuz41GiajjZpRSWR7/42q/EyA/QEiM= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 h1:SKvPgvdvmiTWoi0GAJ7AsJfOz3ngVkD/ERbs5pUnHNI=