Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,56 @@ describe('Kafka.send', () => {

expect(logger.crit).toHaveBeenCalledWith(expect.stringContaining('Kafka Send Error'))
})

it('extracts nested Kafka cause for connect errors', async () => {
const producer = new (Kafka as unknown as jest.Mock)({} as KafkaConfig).producer()
;(producer.connect as unknown as jest.Mock).mockReset()

// Simulate a KafkaJSError with a nested cause coming from Kafka
const kafkaErr = new Error('outer wrapper error') as any
kafkaErr.name = 'KafkaJSError'
kafkaErr.cause = new Error('brokers down')
kafkaErr.cause.name = 'BrokerNotAvailable'
;(producer.connect as unknown as jest.Mock).mockRejectedValueOnce(kafkaErr)

const logger = { crit: jest.fn() } as any

try {
await testDestination.testAction('send', { ...(testData as any), logger })
} catch (error) {
expect(error).toBeInstanceOf(IntegrationError)
expect((error as IntegrationError).message).toBe('Kafka Connection Error - BrokerNotAvailable: brokers down')
expect((error as IntegrationError).status).toBe(500)
}

expect(logger.crit).toHaveBeenCalledWith(expect.stringContaining('BrokerNotAvailable'))
})

it('extracts nested Kafka cause for send errors', async () => {
const producer = new (Kafka as unknown as jest.Mock)({} as KafkaConfig).producer()
;(producer.connect as unknown as jest.Mock).mockReset()
;(producer.connect as unknown as jest.Mock).mockResolvedValueOnce(undefined)

// Simulate a KafkaJSError with a nested cause on send
const kafkaErr = new Error('outer wrapper error') as any
kafkaErr.name = 'KafkaJSError'
kafkaErr.cause = new Error('message too large')
kafkaErr.cause.name = 'MessageSizeTooLarge'
;(producer.send as unknown as jest.Mock).mockReset()
;(producer.send as unknown as jest.Mock).mockRejectedValueOnce(kafkaErr)

const logger = { crit: jest.fn() } as any

try {
await testDestination.testAction('send', { ...(testData as any), logger })
} catch (error) {
expect(error).toBeInstanceOf(IntegrationError)
expect((error as IntegrationError).message).toBe('Kafka Producer Error - MessageSizeTooLarge: message too large')
expect((error as IntegrationError).status).toBe(500)
}

expect(logger.crit).toHaveBeenCalledWith(expect.stringContaining('MessageSizeTooLarge'))
})
})

describe('getOrCreateProducer', () => {
Expand Down
25 changes: 19 additions & 6 deletions packages/destination-actions/src/destinations/kafka/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,14 @@ export const getOrCreateProducer = async (
return producer
}

function getKafkaError(error: Error) {
const errorCause = (error as KafkaJSError)?.cause
if (errorCause) {
return errorCause
}
return error
}

export const sendData = async (
settings: Settings,
payload: Payload[],
Expand Down Expand Up @@ -229,14 +237,18 @@ export const sendData = async (
await producer.connect()
}
} catch (error) {
logger?.crit(`Kafka Connection Error - ${(error as Error).name}: ${(error as Error).stack}`)
if ((error as Error).name !== 'IntegrationError') {
const kafkaError = getKafkaError(error as Error)
logger?.crit(
`Kafka Connection Error - ${kafkaError.name} | ${JSON.stringify(kafkaError)} | stack: ${kafkaError.stack}`
)
throw new IntegrationError(
`Kafka Connection Error - ${(error as Error).name}: ${(error as Error).message}`,
(error as Error)?.name,
`Kafka Connection Error - ${kafkaError.name}: ${kafkaError.message}`,
kafkaError.name,
500
)
} else {
logger?.crit(`Kafka Connection Error - ${error.name}: ${error as Error}`)
throw error
}
}
Expand All @@ -245,10 +257,11 @@ export const sendData = async (
try {
await producer.send(data as ProducerRecord)
} catch (error) {
logger?.crit(`Kafka Send Error - ${(error as Error).name}: ${(error as Error).stack}`)
const kafkaError = getKafkaError(error as Error)
logger?.crit(`Kafka Send Error - ${kafkaError.name} | ${JSON.stringify(kafkaError)} | stack: ${kafkaError.stack}`)
throw new IntegrationError(
`Kafka Producer Error - ${(error as Error).name}: ${(error as Error).message}`,
(error as Error)?.name,
`Kafka Producer Error - ${kafkaError.name}: ${kafkaError.message}`,
kafkaError.name,
500
)
}
Expand Down
Loading