Skip to content

Commit

Permalink
switch to a new json streaming library
Browse files Browse the repository at this point in the history
  • Loading branch information
its-felix committed Nov 7, 2024
1 parent e3efbf5 commit bcc868c
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 70 deletions.
123 changes: 56 additions & 67 deletions go/api/data/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import (
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/bcicen/jstream"
"github.com/explore-flights/monorepo/go/common"
"github.com/explore-flights/monorepo/go/common/adapt"
"github.com/explore-flights/monorepo/go/common/lufthansa"
"github.com/explore-flights/monorepo/go/common/xtime"
jsoniter "github.com/json-iterator/go"
"io"
"iter"
"log/slog"
Expand Down Expand Up @@ -346,16 +346,20 @@ func (h *Handler) Aircraft(ctx context.Context) ([]Aircraft, error) {
}

func (h *Handler) FlightSchedule(ctx context.Context, fn common.FlightNumber) (*common.FlightSchedule, error) {
schedules, err := h.flightSchedulesFull(ctx, fn.Airline)
if err != nil {
return nil, err
}
var fs *common.FlightSchedule
err := h.flightSchedulesStream(ctx, fn.Airline, func(seq iter.Seq2[string, *onceIter[*common.FlightSchedule]]) error {
for fnRaw, scheduleIt := range seq {
if fnRaw == fn.String() {
var err error
fs, err = scheduleIt.Read()
return err
}
}

if schedules == nil {
return nil, nil
}
return nil
})

return schedules[fn], nil
return fs, err
}

func (h *Handler) Flight(ctx context.Context, fn common.FlightNumber, departureDateUTC xtime.LocalDate, departureAirport string, allowCodeShare bool) (*common.Flight, time.Time, error) {
Expand Down Expand Up @@ -503,14 +507,15 @@ func (h *Handler) seatMapS3Key(airline common.AirlineIdentifier, aircraftType, a
}

func (h *Handler) QuerySchedules(ctx context.Context, airline common.AirlineIdentifier, aircraftType, aircraftConfigurationVersion string) (map[common.FlightNumber][]RouteAndRange, error) {
schedules, err := h.flightSchedulesFull(ctx, airline)
if err != nil {
return nil, err
}

result := make(map[common.FlightNumber][]RouteAndRange)
if schedules != nil {
for fn, fs := range schedules {
err := h.flightSchedulesStream(ctx, airline, func(seq iter.Seq2[string, *onceIter[*common.FlightSchedule]]) error {
for _, scheduleIt := range seq {
fs, err := scheduleIt.Read()
if err != nil {
return err
}

fn := fs.Number()
for _, variant := range fs.Variants {
if variant.Data.ServiceType == "J" && variant.Data.AircraftType == aircraftType && variant.Data.AircraftConfigurationVersion == aircraftConfigurationVersion && variant.Data.OperatedAs == fn {
if cnt, span := variant.Ranges.Span(); cnt > 0 {
Expand All @@ -537,12 +542,14 @@ func (h *Handler) QuerySchedules(ctx context.Context, airline common.AirlineIden
}
}
}
}

return result, nil
return nil
})

return result, err
}

func (h *Handler) flightSchedulesStream(ctx context.Context, airline common.AirlineIdentifier, fn func(seq iter.Seq[jstream.KV]) error) error {
func (h *Handler) flightSchedulesStream(ctx context.Context, airline common.AirlineIdentifier, fn func(seq iter.Seq2[string, *onceIter[*common.FlightSchedule]]) error) error {
resp, err := h.s3c.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(h.bucket),
Key: aws.String(fmt.Sprintf("processed/schedules/%s.json.gz", airline)),
Expand All @@ -563,51 +570,19 @@ func (h *Handler) flightSchedulesStream(ctx context.Context, airline common.Airl
return err
}

decoder := jstream.NewDecoder(r, 1).EmitKV()
err = fn(func(yield func(jstream.KV) bool) {
for mv := range decoder.Stream() {
if !yield(mv.Value.(jstream.KV)) {
return
}
}
})

if err != nil {
return err
}

if err = decoder.Err(); err != nil {
return err
}
defer r.Close()

return nil
}
it := jsoniter.Parse(jsoniter.ConfigCompatibleWithStandardLibrary, r, 8196)
err = fn(func(yield func(string, *onceIter[*common.FlightSchedule]) bool) {
it.ReadObjectCB(func(value *jsoniter.Iterator, key string) bool {
oit := &onceIter[*common.FlightSchedule]{it: value}
defer oit.Consume()

func (h *Handler) flightSchedulesFull(ctx context.Context, airline common.AirlineIdentifier) (map[common.FlightNumber]*common.FlightSchedule, error) {
resp, err := h.s3c.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(h.bucket),
Key: aws.String(fmt.Sprintf("processed/schedules/%s.json.gz", airline)),
return yield(key, oit)
})
})

if err != nil {
if adapt.IsS3NotFound(err) {
return nil, nil
} else {
return nil, err
}
}

defer resp.Body.Close()

r, err := gzip.NewReader(resp.Body)
if err != nil {
return nil, err
}

defer r.Close()

var schedules map[common.FlightNumber]*common.FlightSchedule
return schedules, json.NewDecoder(r).Decode(&schedules)
return errors.Join(err, it.Error)
}

func (h *Handler) loadCsv(ctx context.Context, name string) (*csvReader, error) {
Expand Down Expand Up @@ -679,12 +654,26 @@ func (r *csvReader) Close() error {
return r.c.Close()
}

func compareBool(a, b bool) int {
if a == b {
return 0
} else if a {
return 1
} else {
return -1
type onceIter[T any] struct {
it *jsoniter.Iterator
v T
err error
read bool
}

func (it *onceIter[T]) Read() (T, error) {
if it.read {
return it.v, it.err
}

it.read = true
it.it.ReadVal(&it.v)
return it.v, it.err
}

func (it *onceIter[T]) Consume() {
if !it.read {
it.read = true
it.it.Skip()
}
}
4 changes: 3 additions & 1 deletion go/api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.28.0
github.com/aws/aws-sdk-go-v2/service/s3 v1.66.0
github.com/aws/aws-sdk-go-v2/service/ssm v1.55.2
github.com/bcicen/jstream v1.0.1
github.com/explore-flights/monorepo/go/common v0.0.0
github.com/goccy/go-graphviz v0.2.2
github.com/gofrs/uuid/v5 v5.3.0
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/gorilla/feeds v1.2.0
github.com/json-iterator/go v1.1.12
github.com/labstack/echo/v4 v4.12.0
golang.org/x/sync v0.8.0
golang.org/x/time v0.7.0
Expand Down Expand Up @@ -43,6 +43,8 @@ require (
github.com/labstack/gommon v0.4.2 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/tetratelabs/wazero v1.8.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
Expand Down
10 changes: 8 additions & 2 deletions go/api/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 h1:CiS7i0+FUe+/YY1GvIBLLrR/XNGZ
github.com/aws/aws-sdk-go-v2/service/sts v1.32.2/go.mod h1:HtaiBI8CjYoNVde8arShXb94UbQQi9L4EMr6D+xGBwo=
github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM=
github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/bcicen/jstream v1.0.1 h1:BXY7Cu4rdmc0rhyTVyT3UkxAiX3bnLpKLas9btbH5ck=
github.com/bcicen/jstream v1.0.1/go.mod h1:9ielPxqFry7Y4Tg3j4BfjPocfJ3TbsRtXOAYXYmRuAQ=
github.com/corona10/goimagehash v1.1.0 h1:teNMX/1e+Wn/AYSbLHX8mj+mF9r60R1kBeqE9MkoYwI=
github.com/corona10/goimagehash v1.1.0/go.mod h1:VkvE0mLn84L4aF8vCb6mafVajEb6QYMHl2ZJLn0mOGI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -59,12 +57,15 @@ github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF0
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gorilla/feeds v1.2.0 h1:O6pBiXJ5JHhPvqy53NsjKOThq+dNFm8+DFrxBEdzSCc=
github.com/gorilla/feeds v1.2.0/go.mod h1:WMib8uJP3BbY+X8Szd1rA5Pzhdfh+HCCAYT2z7Fza6Y=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand All @@ -78,13 +79,18 @@ github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovk
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ=
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tetratelabs/wazero v1.8.1 h1:NrcgVbWfkWvVc4UtT4LRLDf91PsOzDzefMdwhLfA550=
Expand Down

0 comments on commit bcc868c

Please sign in to comment.