Skip to content

Commit

Permalink
add sfn and cron rule for daily refresh of schedules/flights
Browse files Browse the repository at this point in the history
  • Loading branch information
its-felix committed May 5, 2024
1 parent cdb78d8 commit 9fbdd9c
Show file tree
Hide file tree
Showing 4 changed files with 224 additions and 5 deletions.
56 changes: 56 additions & 0 deletions cdk/lib/constructs/sfn-construct.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { Construct } from 'constructs';
import { IBucket } from 'aws-cdk-lib/aws-s3';
import { IFunction } from 'aws-cdk-lib/aws-lambda';
import { DefinitionBody, IStateMachine, StateMachine, TaskInput, JsonPath } from 'aws-cdk-lib/aws-stepfunctions';
import { LambdaInvoke } from 'aws-cdk-lib/aws-stepfunctions-tasks';

export interface SfnConstructProps {
dataBucket: IBucket;
cronLambda: IFunction;
}

export class SfnConstruct extends Construct {
readonly flightSchedules: IStateMachine;

constructor(scope: Construct, id: string, props: SfnConstructProps) {
super(scope, id);

const definition = new LambdaInvoke(this, 'LoadSchedulesTask', {
lambdaFunction: props.cronLambda,
payload: TaskInput.fromObject({
'action': 'cron',
'params': {
'loadFlightSchedules': {
'outputBucket': props.dataBucket.bucketName,
'outputPrefix': 'raw/LH_Public_Data/flightschedules/',
'schedule': JsonPath.stringAt('$.schedule'),
},
},
}),
payloadResponseOnly: true,
resultPath: '$.loadSchedulesResponse',
retryOnServiceExceptions: true,
})
.next(new LambdaInvoke(this, 'ConvertSchedulesTask', {
lambdaFunction: props.cronLambda,
payload: TaskInput.fromObject({
'action': 'convert_flight_schedules',
'params': {
'inputBucket': JsonPath.stringAt('$.loadSchedulesResponse.loadFlightSchedules.input.outputBucket'),
'inputPrefix': JsonPath.stringAt('$.loadSchedulesResponse.loadFlightSchedules.input.outputPrefix'),
'outputBucket': props.dataBucket.bucketName,
'outputPrefix': 'processed/flights/',
'dateRanges': JsonPath.objectAt('$.loadSchedulesResponse.loadFlightSchedules.input.dateRanges'),
},
}),
payloadResponseOnly: true,
resultPath: '$.convertSchedulesResponse',
retryOnServiceExceptions: true,
}))

this.flightSchedules = new StateMachine(this, 'FlightSchedules', {
definitionBody: DefinitionBody.fromChainable(definition),
tracingEnabled: false,
});
}
}
27 changes: 26 additions & 1 deletion cdk/lib/stacks/cron-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import { IBucket } from 'aws-cdk-lib/aws-s3';
import { CronLambdaConstruct } from '../constructs/cron-lambda-construct';
import { SfnConstruct } from '../constructs/sfn-construct';
import { EventField, Rule, RuleTargetInput, Schedule } from 'aws-cdk-lib/aws-events';
import { SfnStateMachine } from 'aws-cdk-lib/aws-events-targets';

export interface CronStackProps extends cdk.StackProps {
cronLambdaZipPath: string;
Expand All @@ -12,7 +15,7 @@ export class CronStack extends cdk.Stack {
constructor(scope: Construct, id: string, props: CronStackProps) {
super(scope, id, props);

new CronLambdaConstruct(this, 'CronLambda', {
const cronLambda = new CronLambdaConstruct(this, 'CronLambda', {
cronLambdaZipPath: props.cronLambdaZipPath,
dataBucket: props.dataBucket,
lhApiClientId: cdk.SecretValue.cfnParameter(new cdk.CfnParameter(this, 'lhApiClientId', {
Expand All @@ -24,5 +27,27 @@ export class CronStack extends cdk.Stack {
noEcho: true,
})),
});

const sfn = new SfnConstruct(this, 'SFN', {
dataBucket: props.dataBucket,
cronLambda: cronLambda.lambda,
});

new Rule(this, 'FlightSchedulesDaily', {
schedule: Schedule.cron({
minute: '0',
hour: '10',
day: '*',
month: '*',
year: '*',
}),
targets: [
new SfnStateMachine(sfn.flightSchedules, {
input: RuleTargetInput.fromObject({
schedule: 'daily',
}),
}),
],
})
}
}
129 changes: 129 additions & 0 deletions go/cron/action/cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package action

import (
"context"
"encoding/json"
"errors"
"github.com/explore-flights/monorepo/go/common"
"time"
)

type CronParams struct {
LoadFlightSchedules *struct {
OutputBucket string `json:"outputBucket"`
OutputPrefix string `json:"outputPrefix"`
Schedule string `json:"schedule"`
} `json:"loadFlightSchedules,omitempty"`
ConvertFlightSchedules *struct {
InputBucket string `json:"inputBucket"`
InputPrefix string `json:"inputPrefix"`
OutputBucket string `json:"outputBucket"`
OutputPrefix string `json:"outputPrefix"`
Schedule string `json:"schedule"`
} `json:"convertFlightSchedules,omitempty"`
}

type CronOutput struct {
LoadFlightSchedules *InputOutput[LoadFlightSchedulesParams, LoadFlightSchedulesOutput] `json:"loadFlightSchedules,omitempty"`
ConvertFlightSchedules *InputOutput[ConvertFlightSchedulesParams, ConvertFlightSchedulesOutput] `json:"convertFlightSchedules,omitempty"`
}

type InputOutput[IN any, OUT any] struct {
Input IN `json:"input"`
Output OUT `json:"output"`
}

type cronAction struct {
lfsA Action[LoadFlightSchedulesParams, LoadFlightSchedulesOutput]
cfsA Action[ConvertFlightSchedulesParams, ConvertFlightSchedulesOutput]
}

func NewCronAction(lfsA Action[LoadFlightSchedulesParams, LoadFlightSchedulesOutput], cfsA Action[ConvertFlightSchedulesParams, ConvertFlightSchedulesOutput]) Action[CronParams, CronOutput] {
return &cronAction{
lfsA: lfsA,
cfsA: cfsA,
}
}

func (c *cronAction) Handle(ctx context.Context, params CronParams) (CronOutput, error) {
var output CronOutput
var err error

if params.LoadFlightSchedules != nil {
lfsInOut := InputOutput[LoadFlightSchedulesParams, LoadFlightSchedulesOutput]{
Input: LoadFlightSchedulesParams{
OutputBucket: params.LoadFlightSchedules.OutputBucket,
OutputPrefix: params.LoadFlightSchedules.OutputPrefix,
DateRanges: nil,
},
}

switch params.LoadFlightSchedules.Schedule {
case "daily":
start := time.Now()
end := start.AddDate(0, 0, 7)

lfsInOut.Input.DateRanges = append(lfsInOut.Input.DateRanges, [2]common.LocalDate{
common.NewLocalDate(start),
common.NewLocalDate(end),
})

default:
return output, errors.New("invalid schedule")
}

if lfsInOut.Output, err = c.lfsA.Handle(ctx, lfsInOut.Input); err != nil {
return output, err
}

output.LoadFlightSchedules = &lfsInOut
}

if params.ConvertFlightSchedules != nil {
cfsInOut := InputOutput[ConvertFlightSchedulesParams, ConvertFlightSchedulesOutput]{
Input: ConvertFlightSchedulesParams{
InputBucket: params.ConvertFlightSchedules.InputBucket,
InputPrefix: params.ConvertFlightSchedules.InputPrefix,
OutputBucket: params.ConvertFlightSchedules.OutputBucket,
OutputPrefix: params.ConvertFlightSchedules.OutputPrefix,
DateRanges: nil,
},
}

switch params.LoadFlightSchedules.Schedule {
case "daily":
start := time.Now()
end := start.AddDate(0, 0, 7)

cfsInOut.Input.DateRanges = append(cfsInOut.Input.DateRanges, [2]common.LocalDate{
common.NewLocalDate(start),
common.NewLocalDate(end),
})

default:
return output, errors.New("invalid schedule")
}

if cfsInOut.Output, err = c.cfsA.Handle(ctx, cfsInOut.Input); err != nil {
return output, err
}

output.ConvertFlightSchedules = &cfsInOut
}

return output, nil
}

func handle[IN any, OUT any](ctx context.Context, act Action[IN, OUT], params json.RawMessage) ([]byte, error) {
var input IN
if err := json.Unmarshal(params, &input); err != nil {
return nil, err
}

output, err := act.Handle(ctx, input)
if err != nil {
return nil, err
}

return json.Marshal(output)
}
17 changes: 13 additions & 4 deletions go/cron/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func main() {
lambda.StartWithOptions(newHandler(s3.NewFromConfig(cfg)), lambda.WithContext(ctx))
}

func newHandler(s3c *s3.Client) func(ctx context.Context, event InputEvent) ([]byte, error) {
func newHandler(s3c *s3.Client) func(ctx context.Context, event InputEvent) (json.RawMessage, error) {
lhc := lufthansa.NewClient(
lhClientId,
lhClientSecret,
Expand All @@ -62,8 +62,9 @@ func newHandler(s3c *s3.Client) func(ctx context.Context, event InputEvent) ([]b
lAircraftAction := action.NewLoadMetadataAction(s3c, lhc, (*lufthansa.Client).AircraftRaw, "aircraft")
lfsAction := action.NewLoadFlightSchedulesAction(s3c, lhc)
cfsAction := action.NewConvertFlightSchedulesAction(s3c)
cronAction := action.NewCronAction(lfsAction, cfsAction)

return func(ctx context.Context, event InputEvent) ([]byte, error) {
return func(ctx context.Context, event InputEvent) (json.RawMessage, error) {
switch event.Action {
case "load_countries":
return handle(ctx, lCountriesAction, event.Params)
Expand All @@ -85,13 +86,16 @@ func newHandler(s3c *s3.Client) func(ctx context.Context, event InputEvent) ([]b

case "convert_flight_schedules":
return handle(ctx, cfsAction, event.Params)

case "cron":
return handle(ctx, cronAction, event.Params)
}

return nil, fmt.Errorf("unsupported action: %v", event.Action)
}
}

func handle[IN any, OUT any](ctx context.Context, act action.Action[IN, OUT], params json.RawMessage) ([]byte, error) {
func handle[IN any, OUT any](ctx context.Context, act action.Action[IN, OUT], params json.RawMessage) (json.RawMessage, error) {
var input IN
if err := json.Unmarshal(params, &input); err != nil {
return nil, err
Expand All @@ -102,5 +106,10 @@ func handle[IN any, OUT any](ctx context.Context, act action.Action[IN, OUT], pa
return nil, err
}

return json.Marshal(output)
b, err := json.Marshal(output)
if err != nil {
return nil, err
}

return b, nil
}

0 comments on commit 9fbdd9c

Please sign in to comment.