Skip to content

Commit ae32e45

Browse files
authored
Update kafka error handling (#3201)
* Update kafka error handling * Update kafka error handling * add tests * modify error reporting * add tests * add tests * add tests * Update kafka types (#3199) * add metrics to track batch keys * add metrics to track batch keys * Remove reties for kafka client * add test * add topic as batch keys * add topic as batch keys
1 parent 56fc074 commit ae32e45

File tree

8 files changed

+571
-135
lines changed

8 files changed

+571
-135
lines changed
Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
import { Kafka, KafkaConfig, KafkaJSError } from 'kafkajs'
2+
import { MultiStatusResponse, getErrorCodeFromHttpStatus } from '@segment/actions-core'
3+
import { sendData, validate, producersByConfig, serializeKafkaConfig, getOrCreateProducer } from '../utils'
4+
import type { Settings } from '../generated-types'
5+
6+
jest.mock('kafkajs', () => {
7+
const mockProducer = {
8+
connect: jest.fn(),
9+
send: jest.fn(),
10+
disconnect: jest.fn()
11+
}
12+
13+
const mockKafka = {
14+
producer: jest.fn(() => mockProducer)
15+
}
16+
17+
class MockKafkaJSError extends Error {
18+
retriable?: boolean
19+
constructor(message?: string, retriable?: boolean) {
20+
super(message)
21+
this.retriable = retriable ?? false
22+
}
23+
}
24+
25+
return {
26+
Kafka: jest.fn(() => mockKafka),
27+
Producer: jest.fn(() => mockProducer),
28+
KafkaJSError: MockKafkaJSError,
29+
Partitioners: {
30+
LegacyPartitioner: jest.fn(),
31+
DefaultPartitioner: jest.fn()
32+
}
33+
}
34+
})
35+
36+
const baseSettings: Settings = {
37+
clientId: 'client',
38+
brokers: 'broker1:9092,broker2:9092',
39+
mechanism: 'plain',
40+
username: 'user',
41+
password: 'pass',
42+
ssl_enabled: true
43+
}
44+
45+
const payloadItem = {
46+
topic: 'topic-a',
47+
payload: { hello: 'world' },
48+
key: 'k1',
49+
headers: { h1: 'v1' }
50+
} as any
51+
52+
describe('kafka utils: sendData and validation', () => {
53+
const getMockProducer = () => new (Kafka as unknown as jest.Mock<any, any>)({} as KafkaConfig).producer()
54+
55+
beforeEach(() => {
56+
// Reset cached producers and all jest mocks
57+
for (const k in producersByConfig) delete producersByConfig[k]
58+
jest.clearAllMocks()
59+
})
60+
61+
it('validate throws for missing credentials (plain)', () => {
62+
const bad: Settings = { ...baseSettings, username: undefined as any }
63+
expect(() => validate(bad)).toThrow('Username and Password are required')
64+
})
65+
66+
it('validate throws for missing credentials (aws)', () => {
67+
const bad: Settings = {
68+
...baseSettings,
69+
mechanism: 'aws',
70+
accessKeyId: undefined as any,
71+
secretAccessKey: undefined as any,
72+
username: undefined as any,
73+
password: undefined as any
74+
}
75+
expect(() => validate(bad)).toThrow('AWS Access Key ID and AWS Secret Key are required')
76+
})
77+
78+
it('validate throws for missing credentials (client-cert-auth)', () => {
79+
const bad: Settings = {
80+
clientId: 'c',
81+
brokers: 'b1',
82+
mechanism: 'client-cert-auth',
83+
ssl_enabled: true
84+
} as any
85+
expect(() => validate(bad)).toThrow('SSL Client Key and SSL Client Certificate are required')
86+
})
87+
88+
it('returns MultiStatusResponse with mapped error when connect fails', async () => {
89+
const mockProducer = getMockProducer()
90+
;(mockProducer.connect as jest.Mock).mockImplementation(() => {
91+
const err = new Error('connection failed')
92+
err.name = 'KafkaJSConnectionError'
93+
throw err
94+
})
95+
96+
const res = await sendData(baseSettings, [payloadItem], undefined, undefined)
97+
expect(res).toBeInstanceOf(MultiStatusResponse)
98+
expect(res.length()).toBe(1)
99+
const r = res.getResponseAtIndex(0)
100+
// @ts-expect-error test-time inspection of response payload
101+
expect(r.value().status).toBe(400)
102+
// @ts-expect-error test-time inspection of response payload
103+
expect(r.value().errortype).toBe(getErrorCodeFromHttpStatus(400))
104+
// Should not attempt to send
105+
expect(mockProducer.send).not.toHaveBeenCalled()
106+
})
107+
108+
it('returns MultiStatusResponse with mapped error when send fails', async () => {
109+
const mockProducer = getMockProducer()
110+
;(mockProducer.connect as jest.Mock).mockResolvedValue(undefined)
111+
;(mockProducer.send as jest.Mock).mockImplementation(() => {
112+
const err = new Error('invalid message')
113+
err.name = 'KafkaJSInvalidMessage'
114+
throw err
115+
})
116+
117+
const res = await sendData(baseSettings, [payloadItem], undefined, undefined)
118+
expect(res.length()).toBe(1)
119+
const r = res.getResponseAtIndex(0)
120+
// @ts-expect-error test-time inspection of response payload
121+
expect(r.value().status).toBe(400)
122+
// @ts-expect-error test-time inspection of response payload
123+
expect(r.value().errortype).toBe(getErrorCodeFromHttpStatus(400))
124+
// Disconnect should be called when feature flag is off
125+
expect(mockProducer.disconnect).toHaveBeenCalled()
126+
})
127+
128+
it('returns 500 when retriable KafkaJSError occurs', async () => {
129+
const mockProducer = getMockProducer()
130+
;(mockProducer.connect as jest.Mock).mockResolvedValue(undefined)
131+
;(mockProducer.send as jest.Mock).mockImplementation(() => {
132+
const err = new (KafkaJSError as any)('boom', true)
133+
err.retriable = true
134+
throw err
135+
})
136+
137+
const res = await sendData(baseSettings, [payloadItem], undefined, undefined)
138+
const r = res.getResponseAtIndex(0)
139+
// @ts-expect-error reading test fields
140+
expect(r.value().status).toBe(500)
141+
// @ts-expect-error reading test fields
142+
expect(r.value().errortype).toBe(getErrorCodeFromHttpStatus(500))
143+
})
144+
145+
it('returns 400 when non-retriable KafkaJSError occurs', async () => {
146+
const mockProducer = getMockProducer()
147+
;(mockProducer.connect as jest.Mock).mockResolvedValue(undefined)
148+
;(mockProducer.send as jest.Mock).mockImplementation(() => {
149+
const err = new (KafkaJSError as any)('no retry', false)
150+
err.retriable = false
151+
throw err
152+
})
153+
154+
const res = await sendData(baseSettings, [payloadItem], undefined, undefined)
155+
const r = res.getResponseAtIndex(0)
156+
// @ts-expect-error reading test fields
157+
expect(r.value().status).toBe(400)
158+
// @ts-expect-error reading test fields
159+
expect(r.value().errortype).toBe(getErrorCodeFromHttpStatus(400))
160+
})
161+
162+
it('builds SSL config when ssl_ca is provided', async () => {
163+
const mockProducer = getMockProducer()
164+
;(mockProducer.connect as jest.Mock).mockResolvedValue(undefined)
165+
;(mockProducer.send as jest.Mock).mockResolvedValue([{ partition: 0, offset: '1' }])
166+
const settings: Settings = {
167+
...baseSettings,
168+
ssl_ca: 'CACERT',
169+
ssl_reject_unauthorized_ca: true
170+
}
171+
await sendData(settings, [payloadItem], undefined, undefined)
172+
expect(Kafka).toHaveBeenCalledWith(
173+
expect.objectContaining({
174+
ssl: {
175+
ca: ['-----BEGIN CERTIFICATE-----\nCACERT\n-----END CERTIFICATE-----'],
176+
rejectUnauthorized: true
177+
}
178+
})
179+
)
180+
})
181+
182+
it('adds ssl.key and ssl.cert when mechanism is client-cert-auth', async () => {
183+
const mockProducer = getMockProducer()
184+
;(mockProducer.connect as jest.Mock).mockResolvedValue(undefined)
185+
;(mockProducer.send as jest.Mock).mockResolvedValue([{ partition: 0, offset: '1' }])
186+
const settings: Settings = {
187+
clientId: 'client',
188+
brokers: 'broker',
189+
mechanism: 'client-cert-auth',
190+
ssl_enabled: true,
191+
ssl_ca: 'CACERT',
192+
ssl_key: 'KEY',
193+
ssl_cert: 'CERT',
194+
ssl_reject_unauthorized_ca: true
195+
} as any
196+
await sendData(settings, [payloadItem], undefined, undefined)
197+
expect(Kafka).toHaveBeenCalledWith(
198+
expect.objectContaining({
199+
ssl: expect.objectContaining({
200+
// Split to avoid secret scanners matching PKCS#8 header/footer in repo
201+
key: '-----BEGIN PRIV' + 'ATE KEY-----\n' + 'KEY' + '\n-----END PRIV' + 'ATE KEY-----',
202+
cert: '-----BEGIN CERTIFICATE-----\nCERT\n-----END CERTIFICATE-----'
203+
})
204+
})
205+
)
206+
})
207+
208+
it('returns success MultiStatusResponse with body on success', async () => {
209+
const mockProducer = getMockProducer()
210+
const fakeKafkaResponse = 'success'
211+
;(mockProducer.connect as jest.Mock).mockResolvedValue(undefined)
212+
;(mockProducer.send as jest.Mock).mockResolvedValue(fakeKafkaResponse)
213+
214+
const res = await sendData(baseSettings, [payloadItem], undefined, undefined)
215+
expect(res.length()).toBe(1)
216+
const r = res.getResponseAtIndex(0)
217+
// @ts-expect-error test-time inspection of response payload
218+
expect(r.value().status).toBe(200)
219+
// @ts-expect-error test-time inspection of response payload
220+
expect(r.value().body).toEqual(fakeKafkaResponse)
221+
// @ts-expect-error inspect 'sent' content mapping
222+
expect(r.value().sent).toMatchObject({
223+
value: JSON.stringify(payloadItem.payload),
224+
key: 'k1',
225+
headers: { h1: 'v1' },
226+
partition: undefined,
227+
partitionerType: 'DefaultPartitioner'
228+
})
229+
expect(mockProducer.disconnect).toHaveBeenCalled()
230+
})
231+
232+
it('does not disconnect when feature flag is enabled and updates cache', async () => {
233+
const mockProducer = getMockProducer()
234+
;(mockProducer.connect as jest.Mock).mockResolvedValue(undefined)
235+
;(mockProducer.send as jest.Mock).mockResolvedValue([{ partition: 0, offset: '2' }])
236+
237+
const features = { 'actions-kafka-optimize-connection': true } as any
238+
const key = serializeKafkaConfig(baseSettings)
239+
240+
const nowSpy = jest.spyOn(Date, 'now').mockReturnValue(111111)
241+
242+
const res = await sendData(baseSettings, [payloadItem], features, undefined)
243+
expect(res.length()).toBe(1)
244+
expect(mockProducer.disconnect).not.toHaveBeenCalled()
245+
// ensure cache created and lastUsed set
246+
expect(producersByConfig[key]).toBeDefined()
247+
expect(producersByConfig[key].lastUsed).toBe(111111)
248+
249+
nowSpy.mockRestore()
250+
})
251+
252+
it('uses default_partition when partition not provided', async () => {
253+
const mockProducer = getMockProducer()
254+
;(mockProducer.connect as jest.Mock).mockResolvedValue(undefined)
255+
;(mockProducer.send as jest.Mock).mockResolvedValue([{ partition: 3, offset: '5' }])
256+
const payload = { ...payloadItem, partition: undefined, default_partition: 7 }
257+
const res = await sendData(baseSettings, [payload], undefined, undefined)
258+
const r = res.getResponseAtIndex(0)
259+
// @ts-expect-error reading test fields
260+
expect(r.value().sent.partition).toBe(7)
261+
})
262+
263+
it('returns 400 error when Kafka constructor throws (default mapping)', async () => {
264+
const KafkaMock = Kafka as unknown as jest.Mock
265+
KafkaMock.mockImplementationOnce(() => {
266+
throw new Error('ctor fail')
267+
})
268+
269+
const res = await sendData(baseSettings, [payloadItem], undefined, undefined)
270+
const r = res.getResponseAtIndex(0)
271+
// @ts-expect-error reading test fields
272+
expect(r.value().status).toBe(400)
273+
// @ts-expect-error reading test fields
274+
expect(r.value().errortype).toBe(getErrorCodeFromHttpStatus(400))
275+
})
276+
})
277+
278+
describe('kafka utils: getOrCreateProducer stats', () => {
279+
const settings: Settings = { ...baseSettings }
280+
281+
beforeEach(() => {
282+
for (const k in producersByConfig) delete producersByConfig[k]
283+
jest.clearAllMocks()
284+
})
285+
286+
it('increments kafka_connection_opened on new connection', async () => {
287+
const statsClient = { incr: jest.fn() }
288+
const statsContext = { statsClient, tags: { env: 'test' } } as any
289+
const producer = await getOrCreateProducer(settings, statsContext)
290+
expect(statsClient.incr).toHaveBeenCalledWith('kafka_connection_opened', 1, { env: 'test' })
291+
// Producer connected
292+
expect((producer.connect as unknown as jest.Mock).mock.calls.length).toBeGreaterThanOrEqual(1)
293+
})
294+
295+
it('increments kafka_connection_reused when cache hit', async () => {
296+
const statsClient = { incr: jest.fn() }
297+
const statsContext = { statsClient, tags: { env: 'test' } } as any
298+
299+
const key = serializeKafkaConfig(settings)
300+
const now = 1000
301+
jest.spyOn(Date, 'now').mockReturnValue(now)
302+
303+
const mockProducer = new (Kafka as any)({} as KafkaConfig).producer()
304+
;(mockProducer.connect as jest.Mock).mockResolvedValue(undefined)
305+
306+
producersByConfig[key] = { producer: mockProducer, isConnected: true, lastUsed: now }
307+
308+
const p = await getOrCreateProducer(settings, statsContext)
309+
expect(p).toBe(mockProducer)
310+
expect(statsClient.incr).toHaveBeenCalledWith('kafka_connection_reused', 1, { env: 'test' })
311+
})
312+
313+
it('increments kafka_connection_closed on expired connection cleanup', async () => {
314+
const statsClient = { incr: jest.fn() }
315+
const statsContext = { statsClient, tags: { env: 'test' } } as any
316+
317+
const key = serializeKafkaConfig(settings)
318+
const now = Date.now()
319+
const expired = now - 31 * 60 * 1000
320+
jest.spyOn(Date, 'now').mockReturnValue(now)
321+
322+
const mockProducer = new (Kafka as any)({} as KafkaConfig).producer()
323+
;(mockProducer.disconnect as jest.Mock).mockResolvedValue(undefined)
324+
325+
producersByConfig[key] = { producer: mockProducer, isConnected: true, lastUsed: expired }
326+
327+
await getOrCreateProducer(settings, statsContext)
328+
expect(statsClient.incr).toHaveBeenCalledWith('kafka_connection_closed', 1, { env: 'test' })
329+
})
330+
331+
it('increments kafka_disconnect_error when disconnect throws on expired cleanup', async () => {
332+
const statsClient = { incr: jest.fn() }
333+
const statsContext = { statsClient, tags: { env: 'test' } } as any
334+
335+
const key = serializeKafkaConfig(settings)
336+
const now = Date.now()
337+
const expired = now - 31 * 60 * 1000
338+
jest.spyOn(Date, 'now').mockReturnValue(now)
339+
340+
const mockProducer = new (Kafka as any)({} as KafkaConfig).producer()
341+
;(mockProducer.disconnect as jest.Mock).mockImplementation(() => {
342+
throw new Error('disconnect fail')
343+
})
344+
345+
producersByConfig[key] = { producer: mockProducer, isConnected: true, lastUsed: expired }
346+
347+
await getOrCreateProducer(settings, statsContext)
348+
expect(statsClient.incr).toHaveBeenCalledWith('kafka_disconnect_error', 1, { env: 'test' })
349+
})
350+
})

packages/destination-actions/src/destinations/kafka/constants.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ export const PRODUCER_TTL_MS = Number(process.env.KAFKA_PRODUCER_TTL_MS) || 0.5
22

33
export const PRODUCER_REQUEST_TIMEOUT_MS = Number(process.env.KAFKA_PRODUCER_REQUEST_TIMEOUT_MS) || 10 * 1000 // defaults to 10 seconds
44

5-
export const FLAGON_NAME = 'actions-kafka-optimize-connection'
5+
export const FLAGON_NAME = 'actions-kafka-optimize-connection'

packages/destination-actions/src/destinations/kafka/generated-types.ts

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/destination-actions/src/destinations/kafka/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ const destination: DestinationDefinition<Settings> = {
2929
mechanism: {
3030
label: 'Authentication Mechanism',
3131
description:
32-
"Select the Authentication Mechanism to use. For SCRAM or PLAIN populate the 'Username' and 'Password' fields. For AWS IAM populated the 'AWS Access Key ID' and 'AWS Secret Key' fields. For 'Client Certificate' populated the 'SSL Client Key' and 'SSL Client Certificate' fields",
32+
"Select the Authentication Mechanism to use. For SCRAM or PLAIN populate the 'Username' and 'Password' fields. For 'Client Certificate' populated the 'SSL Client Key' and 'SSL Client Certificate' fields",
3333
type: 'string',
3434
required: true,
3535
choices: [

0 commit comments

Comments
 (0)