Skip to content

Commit 60674bb

Browse files
authored
Extract KafkaJSErrors from cause property (#3294)
* Extract KafkaJSErrors from cause property * Add tests
1 parent 606d80c commit 60674bb

File tree

2 files changed

+69
-6
lines changed

2 files changed

+69
-6
lines changed

packages/destination-actions/src/destinations/kafka/send/__tests__/index.test.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,56 @@ describe('Kafka.send', () => {
354354

355355
expect(logger.crit).toHaveBeenCalledWith(expect.stringContaining('Kafka Send Error'))
356356
})
357+
358+
it('extracts nested Kafka cause for connect errors', async () => {
359+
const producer = new (Kafka as unknown as jest.Mock)({} as KafkaConfig).producer()
360+
;(producer.connect as unknown as jest.Mock).mockReset()
361+
362+
// Simulate a KafkaJSError with a nested cause coming from Kafka
363+
const kafkaErr = new Error('outer wrapper error') as any
364+
kafkaErr.name = 'KafkaJSError'
365+
kafkaErr.cause = new Error('brokers down')
366+
kafkaErr.cause.name = 'BrokerNotAvailable'
367+
;(producer.connect as unknown as jest.Mock).mockRejectedValueOnce(kafkaErr)
368+
369+
const logger = { crit: jest.fn() } as any
370+
371+
try {
372+
await testDestination.testAction('send', { ...(testData as any), logger })
373+
} catch (error) {
374+
expect(error).toBeInstanceOf(IntegrationError)
375+
expect((error as IntegrationError).message).toBe('Kafka Connection Error - BrokerNotAvailable: brokers down')
376+
expect((error as IntegrationError).status).toBe(500)
377+
}
378+
379+
expect(logger.crit).toHaveBeenCalledWith(expect.stringContaining('BrokerNotAvailable'))
380+
})
381+
382+
it('extracts nested Kafka cause for send errors', async () => {
383+
const producer = new (Kafka as unknown as jest.Mock)({} as KafkaConfig).producer()
384+
;(producer.connect as unknown as jest.Mock).mockReset()
385+
;(producer.connect as unknown as jest.Mock).mockResolvedValueOnce(undefined)
386+
387+
// Simulate a KafkaJSError with a nested cause on send
388+
const kafkaErr = new Error('outer wrapper error') as any
389+
kafkaErr.name = 'KafkaJSError'
390+
kafkaErr.cause = new Error('message too large')
391+
kafkaErr.cause.name = 'MessageSizeTooLarge'
392+
;(producer.send as unknown as jest.Mock).mockReset()
393+
;(producer.send as unknown as jest.Mock).mockRejectedValueOnce(kafkaErr)
394+
395+
const logger = { crit: jest.fn() } as any
396+
397+
try {
398+
await testDestination.testAction('send', { ...(testData as any), logger })
399+
} catch (error) {
400+
expect(error).toBeInstanceOf(IntegrationError)
401+
expect((error as IntegrationError).message).toBe('Kafka Producer Error - MessageSizeTooLarge: message too large')
402+
expect((error as IntegrationError).status).toBe(500)
403+
}
404+
405+
expect(logger.crit).toHaveBeenCalledWith(expect.stringContaining('MessageSizeTooLarge'))
406+
})
357407
})
358408

359409
describe('getOrCreateProducer', () => {

packages/destination-actions/src/destinations/kafka/utils.ts

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,14 @@ export const getOrCreateProducer = async (
180180
return producer
181181
}
182182

183+
function getKafkaError(error: Error) {
184+
const errorCause = (error as KafkaJSError)?.cause
185+
if (errorCause) {
186+
return errorCause
187+
}
188+
return error
189+
}
190+
183191
export const sendData = async (
184192
settings: Settings,
185193
payload: Payload[],
@@ -229,14 +237,18 @@ export const sendData = async (
229237
await producer.connect()
230238
}
231239
} catch (error) {
232-
logger?.crit(`Kafka Connection Error - ${(error as Error).name}: ${(error as Error).stack}`)
233240
if ((error as Error).name !== 'IntegrationError') {
241+
const kafkaError = getKafkaError(error as Error)
242+
logger?.crit(
243+
`Kafka Connection Error - ${kafkaError.name} | ${JSON.stringify(kafkaError)} | stack: ${kafkaError.stack}`
244+
)
234245
throw new IntegrationError(
235-
`Kafka Connection Error - ${(error as Error).name}: ${(error as Error).message}`,
236-
(error as Error)?.name,
246+
`Kafka Connection Error - ${kafkaError.name}: ${kafkaError.message}`,
247+
kafkaError.name,
237248
500
238249
)
239250
} else {
251+
logger?.crit(`Kafka Connection Error - ${error.name}: ${error as Error}`)
240252
throw error
241253
}
242254
}
@@ -245,10 +257,11 @@ export const sendData = async (
245257
try {
246258
await producer.send(data as ProducerRecord)
247259
} catch (error) {
248-
logger?.crit(`Kafka Send Error - ${(error as Error).name}: ${(error as Error).stack}`)
260+
const kafkaError = getKafkaError(error as Error)
261+
logger?.crit(`Kafka Send Error - ${kafkaError.name} | ${JSON.stringify(kafkaError)} | stack: ${kafkaError.stack}`)
249262
throw new IntegrationError(
250-
`Kafka Producer Error - ${(error as Error).name}: ${(error as Error).message}`,
251-
(error as Error)?.name,
263+
`Kafka Producer Error - ${kafkaError.name}: ${kafkaError.message}`,
264+
kafkaError.name,
252265
500
253266
)
254267
}

0 commit comments

Comments
 (0)