Skip to content

Commit 4721dda

Browse files
authored
fix(parser): Firehose SQS should fail for invalid SQS message (aws-powertools#3526)
1 parent 89a6281 commit 4721dda

File tree

3 files changed

+78
-21
lines changed

3 files changed

+78
-21
lines changed

packages/parser/src/schemas/kinesis-firehose.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,19 @@ const KinesisFirehoseRecordSchema = KinesisFireHoseRecordBase.extend({
3535
* Zod schema for a SQS record from an Kinesis Firehose event.
3636
*/
3737
const KinesisFirehoseSqsRecordSchema = KinesisFireHoseRecordBase.extend({
38-
data: z.string().transform((data) => {
38+
data: z.string().transform((data, ctx) => {
3939
try {
4040
return SqsRecordSchema.parse(
4141
JSON.parse(Buffer.from(data, 'base64').toString('utf8'))
4242
);
4343
} catch (e) {
44-
return data;
44+
ctx.addIssue({
45+
code: z.ZodIssueCode.custom,
46+
message: 'Failed to parse SQS record',
47+
fatal: true,
48+
});
49+
50+
return z.NEVER;
4551
}
4652
}),
4753
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"invocationId": "556b67a3-48fc-4385-af49-e133aade9cb9",
3+
"deliveryStreamArn": "arn:aws:firehose:us-east-1:123456789012:deliverystream/PUT-S3-tdyyE",
4+
"region": "us-east-1",
5+
"records": [
6+
{
7+
"recordId": "49640912821178817833517986466168945147170627572855734274000000",
8+
"approximateArrivalTimestamp": 1684864917398,
9+
"data": "bm90IGEgdmFsaWQgamFzb24="
10+
}
11+
]
12+
}

packages/parser/tests/unit/schema/kinesis.test.ts

+58-19
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import type {
2121
} from '../../../src/types/schema';
2222
import { getTestEvent } from './utils.js';
2323

24-
describe('Kinesis ', () => {
24+
describe('Schema: Kinesis', () => {
2525
const eventsPath = 'kinesis';
2626

2727
const kinesisStreamEvent = getTestEvent<KinesisDataStreamEvent>({
@@ -56,8 +56,11 @@ describe('Kinesis ', () => {
5656
}
5757
);
5858

59-
it('should parse kinesis event', () => {
59+
it('parses kinesis event', () => {
60+
// Prepare
6061
const testEvent = structuredClone(kinesisStreamEvent);
62+
63+
// Act
6164
const parsed = KinesisDataStreamSchema.parse(testEvent);
6265

6366
const transformedInput = {
@@ -72,10 +75,14 @@ describe('Kinesis ', () => {
7275
}),
7376
};
7477

78+
// Assess
7579
expect(parsed).toStrictEqual(transformedInput);
7680
});
77-
it('should parse single kinesis record', () => {
81+
it('parses single kinesis record', () => {
82+
// Prepare
7883
const testEvent = structuredClone(kinesisStreamEventOneRecord);
84+
85+
// Act
7986
const parsed = KinesisDataStreamSchema.parse(testEvent);
8087

8188
const transformedInput = {
@@ -92,10 +99,14 @@ describe('Kinesis ', () => {
9299
}),
93100
};
94101

102+
// Assess
95103
expect(parsed).toStrictEqual(transformedInput);
96104
});
97-
it('should parse Firehose event', () => {
105+
it('parses Firehose event', () => {
106+
// Prepare
98107
const testEvent = structuredClone(kinesisFirehoseEvent);
108+
109+
// Act
99110
const parsed = KinesisFirehoseSchema.parse(testEvent);
100111

101112
const transformedInput = {
@@ -108,11 +119,15 @@ describe('Kinesis ', () => {
108119
};
109120
}),
110121
};
122+
123+
// Assess
111124
expect(parsed).toStrictEqual(transformedInput);
112125
});
113-
it('should parse Kinesis Firehose PutEvents event', () => {
126+
it('parses Kinesis Firehose PutEvents event', () => {
127+
// Prepare
114128
const testEvent = structuredClone(kinesisFirehosePutEvent);
115129

130+
// Act
116131
const parsed = KinesisFirehoseSchema.parse(testEvent);
117132

118133
const transformedInput = {
@@ -125,11 +140,14 @@ describe('Kinesis ', () => {
125140
}),
126141
};
127142

143+
// Assess
128144
expect(parsed).toStrictEqual(transformedInput);
129145
});
130-
it('should parse Firehose event with SQS event', () => {
146+
it('parses Firehose event with SQS event', () => {
147+
// Prepare
131148
const testEvent = structuredClone(kinesisFirehoseSQSEvent);
132149

150+
// Act
133151
const parsed = KinesisFirehoseSqsSchema.parse(testEvent);
134152

135153
const transformedInput = {
@@ -138,21 +156,24 @@ describe('Kinesis ', () => {
138156
return {
139157
...record,
140158
data: JSON.parse(
141-
Buffer.from(record.data as string, 'base64').toString()
159+
Buffer.from(record.data as unknown as string, 'base64').toString()
142160
),
143161
};
144162
}),
145163
};
146164

165+
// Assess
147166
expect(parsed).toStrictEqual(transformedInput);
148167
});
149-
it('should parse Kinesis event with CloudWatch event', () => {
168+
it('parses Kinesis event with CloudWatch event', () => {
169+
// Prepare
150170
const testEvent = structuredClone(kinesisStreamCloudWatchLogsEvent);
151171

172+
// Act
152173
const parsed = KinesisDataStreamSchema.parse(testEvent);
153174

154175
const transformedInput = {
155-
Records: testEvent.Records.map((record, index) => {
176+
Records: testEvent.Records.map((record) => {
156177
return {
157178
...record,
158179
kinesis: {
@@ -167,31 +188,39 @@ describe('Kinesis ', () => {
167188
}),
168189
};
169190

191+
// Assess
170192
expect(parsed).toStrictEqual(transformedInput);
171193
});
172-
it('should return original value if cannot parse KinesisFirehoseSqsRecord', () => {
173-
const testEvent = structuredClone(kinesisFirehoseSQSEvent);
174-
testEvent.records[0].data = 'not a valid json';
175-
176-
const parsed = KinesisFirehoseSqsSchema.parse(testEvent);
194+
it('throws if cannot parse SQS record of KinesisFirehoseSqsRecord', () => {
195+
// Prepare
196+
const testEvent = getTestEvent<KinesisFireHoseSqsEvent>({
197+
eventsPath,
198+
filename: 'firehose-sqs-invalid',
199+
});
177200

178-
expect(parsed).toStrictEqual(testEvent);
201+
// Act & Assess
202+
expect(() => KinesisFirehoseSqsSchema.parse(testEvent)).toThrow();
179203
});
180-
it('should parse a kinesis record from a kinesis event', () => {
204+
it('parses a kinesis record from a kinesis event', () => {
205+
// Prepare
181206
const testEvent: KinesisDataStreamEvent =
182207
structuredClone(kinesisStreamEvent);
183208

209+
// Act
184210
const parsedRecord = KinesisDataStreamRecord.parse(testEvent.Records[0]);
185211

212+
// Assess
186213
expect(parsedRecord.eventSource).toEqual('aws:kinesis');
187214
expect(parsedRecord.eventName).toEqual('aws:kinesis:record');
188215
});
189216

190-
it('should parse a kinesis record from dynamodb stream event', () => {
217+
it('parses a kinesis record from dynamodb stream event', () => {
218+
// Prepare
191219
const testEvent = getTestEvent<KinesisDynamoDBStreamEvent>({
192220
eventsPath,
193221
filename: 'dynamodb-stream',
194222
});
223+
195224
const expectedRecords = [
196225
{
197226
awsRegion: 'eu-west-1',
@@ -231,26 +260,36 @@ describe('Kinesis ', () => {
231260
},
232261
];
233262

263+
// Act
234264
const parsedRecord = KinesisDynamoDBStreamSchema.parse(testEvent);
235265

266+
// Assess
236267
expect(parsedRecord.Records.map((record) => record.kinesis.data)).toEqual(
237268
expectedRecords
238269
);
239270
});
240271

241-
it('should parse a kinesis firehose record from a kinesis firehose event', () => {
272+
it('parses a kinesis firehose record from a kinesis firehose event', () => {
273+
// Prepare
242274
const testEvent = structuredClone(kinesisFirehoseEvent);
275+
276+
// Act
243277
const parsedRecord: KinesisFirehoseRecord =
244278
KinesisFirehoseRecordSchema.parse(testEvent.records[0]);
245279

280+
// Assess
246281
expect(parsedRecord.data).toEqual('Hello World');
247282
});
248283

249-
it('should parse a sqs record from a kinesis firehose event', () => {
284+
it('parses a sqs record from a kinesis firehose event', () => {
285+
// Prepare
250286
const kinesisFireHoseSqsEvent = structuredClone(kinesisFirehoseSQSEvent);
287+
288+
// Act
251289
const parsed: KinesisFirehoseSqsRecord =
252290
KinesisFirehoseSqsRecordSchema.parse(kinesisFireHoseSqsEvent.records[0]);
253291

292+
// Assess
254293
expect(parsed.recordId).toEqual(
255294
'49640912821178817833517986466168945147170627572855734274000000'
256295
);

0 commit comments

Comments
 (0)