Skip to content

Commit

Permalink
improve daily cron
Browse files Browse the repository at this point in the history
  • Loading branch information
its-felix committed Oct 22, 2024
1 parent 9374bbf commit 5d29983
Show file tree
Hide file tree
Showing 11 changed files with 343 additions and 263 deletions.
30 changes: 21 additions & 9 deletions cdk/lib/constructs/cron-lambda-construct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Construct } from 'constructs';
import {
Architecture,
Code,
Function,
Function, FunctionProps,
IFunction,
Runtime,
Tracing
Expand All @@ -20,15 +20,15 @@ export interface CronLambdaConstructProps {
}

export class CronLambdaConstruct extends Construct {
readonly lambda: IFunction;
readonly lambda_1G: IFunction;
readonly lambda_10G: IFunction;

constructor(scope: Construct, id: string, props: CronLambdaConstructProps) {
super(scope, id);

this.lambda = new Function(this, 'CronLambda', {
const lambdaBaseProps = {
runtime: Runtime.PROVIDED_AL2023,
architecture: Architecture.ARM_64,
memorySize: 2048,
timeout: Duration.minutes(15),
code: Code.fromAsset(props.cronLambdaZipPath),
handler: 'bootstrap',
Expand All @@ -41,12 +41,24 @@ export class CronLambdaConstruct extends Construct {
assumedBy: new ServicePrincipal('lambda.amazonaws.com'),
managedPolicies: [{ managedPolicyArn: 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' }],
}),
} satisfies FunctionProps;

this.lambda_1G = new Function(this, 'CronLambda_1G', {
...lambdaBaseProps,
memorySize: 1024,
});

this.lambda_10G = new Function(this, 'CronLambda_10G', {
...lambdaBaseProps,
memorySize: 1024 * 10,
});

props.dataBucket.grantRead(this.lambda, 'raw/LH_Public_Data/flightschedules/*');
props.dataBucket.grantWrite(this.lambda, 'raw/LH_Public_Data/*');
props.dataBucket.grantWrite(this.lambda, 'raw/ourairports_data/*');
props.dataBucket.grantReadWrite(this.lambda, 'processed/flights/*');
props.dataBucket.grantReadWrite(this.lambda, 'processed/schedules/*');
for (const fn of [this.lambda_1G, this.lambda_10G]) {
props.dataBucket.grantRead(fn, 'raw/LH_Public_Data/flightschedules/*');
props.dataBucket.grantWrite(fn, 'raw/LH_Public_Data/*');
props.dataBucket.grantWrite(fn, 'raw/ourairports_data/*');
props.dataBucket.grantReadWrite(fn, 'processed/flights/*');
props.dataBucket.grantReadWrite(fn, 'processed/schedules/*');
}
}
}
180 changes: 105 additions & 75 deletions cdk/lib/constructs/sfn-construct.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@ import { Construct } from 'constructs';
import { IBucket } from 'aws-cdk-lib/aws-s3';
import { IFunction } from 'aws-cdk-lib/aws-lambda';
import {
Choice, Condition,
DefinitionBody,
Fail,
IStateMachine,
JsonPath,
Map,
Pass,
ProcessorMode,
Result,
StateMachine,
Succeed,
TaskInput
Expand All @@ -19,7 +16,8 @@ import { LambdaInvoke } from 'aws-cdk-lib/aws-stepfunctions-tasks';

export interface SfnConstructProps {
dataBucket: IBucket;
cronLambda: IFunction;
cronLambda_1G: IFunction;
cronLambda_10G: IFunction;
webhookUrl: cdk.SecretValue;
}

Expand All @@ -29,103 +27,135 @@ export class SfnConstruct extends Construct {
constructor(scope: Construct, id: string, props: SfnConstructProps) {
super(scope, id);

const definition = new Pass(this, 'InitIterator', {
result: Result.fromObject({
values: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
}),
resultPath: '$.iterator',
})
.next(
new Map(this, 'Iterator', {
itemsPath: '$.iterator.values',
itemSelector: {
'time': JsonPath.stringAt('$.time'),
'schedule': JsonPath.format('{}:{}', JsonPath.stringAt('$.schedule'), JsonPath.stringAt('$$.Map.Item.Value')),
},
maxConcurrency: 1,
resultPath: JsonPath.DISCARD,
})
.itemProcessor(this.iterationBody(props), { mode: ProcessorMode.INLINE })
)
.next(new Succeed(this, 'Success'));
const LH_FLIGHT_SCHEDULES_PREFIX = 'raw/LH_Public_Data/flightschedules/';
const PROCESSED_FLIGHTS_PREFIX = 'processed/flights/';
const PROCESSED_SCHEDULES_PREFIX = 'processed/schedules/';

this.flightSchedules = new StateMachine(this, 'FlightSchedules', {
definitionBody: DefinitionBody.fromChainable(definition),
tracingEnabled: false,
});
}

private iterationBody(props: SfnConstructProps) {
return new LambdaInvoke(this, 'LoadSchedulesTask', {
lambdaFunction: props.cronLambda,
const definition = new LambdaInvoke(this, 'PrepareDailyCron', {
lambdaFunction: props.cronLambda_1G,
payload: TaskInput.fromObject({
'action': 'cron',
'params': {
'loadFlightSchedules': {
'outputBucket': props.dataBucket.bucketName,
'outputPrefix': 'raw/LH_Public_Data/flightschedules/',
'prepareDailyCron': {
'time': JsonPath.stringAt('$.time'),
'schedule': JsonPath.stringAt('$.schedule'),
'offset': -1,
'total': 30 * 12,
},
},
}),
payloadResponseOnly: true,
resultPath: '$.loadSchedulesResponse',
resultSelector: {
'completed': [],
'remaining': JsonPath.objectAt('$.dateRanges'),
},
resultPath: '$.loadScheduleRanges',
retryOnServiceExceptions: true,
})
.next(new LambdaInvoke(this, 'ConvertSchedulesTask', {
lambdaFunction: props.cronLambda,
payload: TaskInput.fromObject({
'action': 'convert_flight_schedules',
'params': {
'inputBucket': JsonPath.stringAt('$.loadSchedulesResponse.loadFlightSchedules.input.outputBucket'),
'inputPrefix': JsonPath.stringAt('$.loadSchedulesResponse.loadFlightSchedules.input.outputPrefix'),
'outputBucket': props.dataBucket.bucketName,
'outputPrefix': 'processed/flights/',
'dateRanges': JsonPath.objectAt('$.loadSchedulesResponse.loadFlightSchedules.input.dateRanges'),
},
}),
payloadResponseOnly: true,
resultPath: '$.convertSchedulesResponse',
retryOnServiceExceptions: true,
}))
.next(new LambdaInvoke(this, 'ConvertFlightsTask', {
lambdaFunction: props.cronLambda,
payload: TaskInput.fromObject({
'action': 'convert_flights',
'params': {
'inputBucket': props.dataBucket.bucketName,
'inputPrefix': 'processed/flights/',
'outputBucket': props.dataBucket.bucketName,
'outputPrefix': 'processed/schedules/',
'dateRanges': JsonPath.objectAt('$.convertSchedulesResponse.dateRanges'),
},
}),
payloadResponseOnly: true,
resultPath: '$.convertNumbersResponse',
retryOnServiceExceptions: true,
}))
.next(
new Choice(this, 'CheckRemaining', {})
// region loop body -> remaining dates
.when(
Condition.isPresent(JsonPath.arrayGetItem(JsonPath.objectAt('$.loadScheduleRanges.remaining'), 0)),
new LambdaInvoke(this, 'LoadSchedulesTask', {
lambdaFunction: props.cronLambda_1G,
payload: TaskInput.fromObject({
'action': 'load_flight_schedules',
'params': {
'outputBucket': props.dataBucket.bucketName,
'outputPrefix': LH_FLIGHT_SCHEDULES_PREFIX,
'dateRanges': JsonPath.objectAt('$.loadScheduleRanges.remaining'),
'allowPartial': true,
},
}),
payloadResponseOnly: true,
resultPath: '$.loadSchedulesResponse',
retryOnServiceExceptions: true,
})
.next(new LambdaInvoke(this, 'MergeScheduleRanges', {
lambdaFunction: props.cronLambda_1G,
payload: TaskInput.fromObject({
'action': 'cron',
'params': {
'mergeDateRanges': {
'first': JsonPath.objectAt('$.loadScheduleRanges.completed'),
'second': JsonPath.objectAt('$.loadSchedulesResponse.completed'),
},
},
}),
payloadResponseOnly: true,
resultSelector: {
'completed': JsonPath.objectAt('$.mergeDateRanges.dateRanges'),
'remaining': JsonPath.objectAt('$.loadSchedulesResponse.remaining'),
},
resultPath: '$.loadScheduleRanges',
retryOnServiceExceptions: true,
})),
)
// endregion
// region conversion
.otherwise(
new LambdaInvoke(this, 'ConvertSchedulesTask', {
lambdaFunction: props.cronLambda_10G,
payload: TaskInput.fromObject({
'action': 'convert_flight_schedules',
'params': {
'inputBucket': props.dataBucket.bucketName,
'inputPrefix': LH_FLIGHT_SCHEDULES_PREFIX,
'outputBucket': props.dataBucket.bucketName,
'outputPrefix': PROCESSED_FLIGHTS_PREFIX,
'dateRanges': JsonPath.objectAt('$.loadScheduleRanges.completed'),
},
}),
payloadResponseOnly: true,
resultPath: '$.convertSchedulesResponse',
retryOnServiceExceptions: true,
})
.next(new LambdaInvoke(this, 'ConvertFlightsTask', {
lambdaFunction: props.cronLambda_10G,
payload: TaskInput.fromObject({
'action': 'convert_flights',
'params': {
'inputBucket': props.dataBucket.bucketName,
'inputPrefix': PROCESSED_FLIGHTS_PREFIX,
'outputBucket': props.dataBucket.bucketName,
'outputPrefix': PROCESSED_SCHEDULES_PREFIX,
'dateRanges': JsonPath.objectAt('$.convertSchedulesResponse.dateRanges'),
},
}),
payloadResponseOnly: true,
resultPath: '$.convertFlightsResponse',
retryOnServiceExceptions: true,
}))
)
// endregion
)
.toSingleState('ConvertTry', { outputPath: '$[0]' })
.addCatch(
this.sendWebhookTask(
'InvokeWebhookFailureTask',
props.cronLambda,
props.cronLambda_1G,
props.webhookUrl,
JsonPath.format('FlightSchedules Cron {} ({}) failed', JsonPath.executionName, JsonPath.executionStartTime),
)
.next(new Fail(this, 'IterationFailure')),
.next(new Fail(this, 'Fail')),
)
.next(this.sendWebhookTask(
'InvokeWebhookSuccessTask',
props.cronLambda,
props.cronLambda_1G,
props.webhookUrl,
JsonPath.format(
'FlightSchedules Cron {} succeeded:\nQueried:\n```json\n{}\n```\nTouched:\n```json\n{}\n```',
JsonPath.stringAt('$.time'),
JsonPath.jsonToString(JsonPath.objectAt('$.loadSchedulesResponse.loadFlightSchedules.input.dateRanges')),
JsonPath.jsonToString(JsonPath.objectAt('$.loadScheduleRanges.completed')),
JsonPath.jsonToString(JsonPath.objectAt('$.convertSchedulesResponse.dateRanges')),
),
));
))
.next(new Succeed(this, 'Success'));

this.flightSchedules = new StateMachine(this, 'FlightSchedules', {
definitionBody: DefinitionBody.fromChainable(definition),
tracingEnabled: false,
});
}

private sendWebhookTask(id: string, fn: IFunction, url: cdk.SecretValue, content: string) {
Expand Down
3 changes: 2 additions & 1 deletion cdk/lib/stacks/cron-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ export class CronStack extends cdk.Stack {

const sfn = new SfnConstruct(this, 'SFN', {
dataBucket: props.dataBucket,
cronLambda: cronLambda.lambda,
cronLambda_1G: cronLambda.lambda_1G,
cronLambda_10G: cronLambda.lambda_10G,
webhookUrl: cdk.SecretValue.cfnParameter(new cdk.CfnParameter(this, 'webhookUrl', {
type: 'String',
noEcho: true,
Expand Down
19 changes: 2 additions & 17 deletions go/common/concurrent/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,16 @@ type RMap[K comparable, V any] interface {
RLocked(f func(m RMap[K, V]))
}

type WMap[K comparable, V any] interface {
type RWMap[K comparable, V any] interface {
RMap[K, V]
Store(k K, v V)
Delete(k K)
LoadAndDelete(k K) (V, bool)
LoadOrStore(k K, v V) (V, bool)
Swap(k K, v V) (V, bool)
WLocked(f func(m WMap[K, V]))
Locked(f func(m RWMap[K, V]))
}

type RWMap[K comparable, V any] interface {
RMap[K, V]
WMap[K, V]
}

// region Map

type Map[K comparable, V any] struct {
Expand Down Expand Up @@ -129,12 +124,6 @@ func (cm Map[K, V]) RLocked(f func(m RMap[K, V])) {
f(rMapProxy[K, V](cm))
}

func (cm Map[K, V]) WLocked(f func(m WMap[K, V])) {
cm.mtx.RLock()
defer cm.mtx.RUnlock()
f(rwMapProxy[K, V](cm))
}

func (cm Map[K, V]) Locked(f func(m RWMap[K, V])) {
cm.mtx.Lock()
defer cm.mtx.Unlock()
Expand Down Expand Up @@ -195,10 +184,6 @@ func (mp rwMapProxy[K, V]) Swap(k K, v V) (V, bool) {
return Map[K, V](mp).unsafeSwap(k, v)
}

func (mp rwMapProxy[K, V]) WLocked(f func(m WMap[K, V])) {
f(mp)
}

func (mp rwMapProxy[K, V]) Locked(f func(m RWMap[K, V])) {
f(mp)
}
Expand Down
2 changes: 1 addition & 1 deletion go/common/jwks/jwks.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (v *Verifier) Key(ctx context.Context, kid string) (jose.JSONWebKey, error)
var matchedJWK jose.JSONWebKey
found := false

v.cache.WLocked(func(ops concurrent.WMap[string, jose.JSONWebKey]) {
v.cache.Locked(func(ops concurrent.RWMap[string, jose.JSONWebKey]) {
for _, jwk := range jwks.Keys {
if jwk.Valid() {
ops.Store(jwk.KeyID, jwk)
Expand Down
11 changes: 10 additions & 1 deletion go/common/local/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,21 @@ func NewS3Client(basePath string) *S3Client {
}

func (s3c *S3Client) GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) {
fpath := filepath.Join(s3c.basePath, *params.Bucket, filepath.FromSlash(*params.Key))
finfo, err := os.Stat(fpath)
if err != nil {
return nil, err
}

f, err := os.Open(filepath.Join(s3c.basePath, *params.Bucket, filepath.FromSlash(*params.Key)))
if err != nil {
return nil, err
}

return &s3.GetObjectOutput{Body: f}, nil
return &s3.GetObjectOutput{
Body: f,
LastModified: aws.Time(finfo.ModTime()),
}, nil
}

func (s3c *S3Client) PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error) {
Expand Down
9 changes: 8 additions & 1 deletion go/common/xiter/common.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package xiter

import "iter"
import (
"iter"
"slices"
)

func All[Slice ~[]E, E any](s Slice) iter.Seq[E] {
return func(yield func(E) bool) {
Expand Down Expand Up @@ -51,3 +54,7 @@ func Combine[T any](seqs ...iter.Seq[T]) iter.Seq[T] {
}
}
}

func Chunk[Slice ~[]E, E any](seq iter.Seq[E], n int) iter.Seq[Slice] {
return slices.Chunk(slices.AppendSeq(make(Slice, 0), seq), n)
}
Loading

0 comments on commit 5d29983

Please sign in to comment.