From 7939cb294c4563e5d62c2b23bdc63e43afd2064f Mon Sep 17 00:00:00 2001 From: Gerald Baulig Date: Tue, 5 Sep 2023 18:10:11 +0200 Subject: [PATCH] fix(kafka_client): catch KafkaJSNumberOfRetriesExceeded from new KafkaJS() new KafkaJS() has its own retry routine integrated and throws KafkaJSNumberOfRetriesExceeded when only 5 retries of 300ms exceeded. This Exception was not catched and also canceled our wrapped retry.attmept. Increasing the initialRetryTime and the number of retries could already solve the issue. However, we wrap a forever retry for just in case. --- .../src/events/provider/kafka/index.ts | 124 ++++++++++-------- 1 file changed, 70 insertions(+), 54 deletions(-) diff --git a/packages/kafka-client/src/events/provider/kafka/index.ts b/packages/kafka-client/src/events/provider/kafka/index.ts index 209baa04..cb6419d9 100644 --- a/packages/kafka-client/src/events/provider/kafka/index.ts +++ b/packages/kafka-client/src/events/provider/kafka/index.ts @@ -528,68 +528,84 @@ export class Kafka { * Suspends the calling function until the producer is connected. */ async start(): Promise { - const operation = retry.operation({forever: true, maxTimeout: 2000}); + const operation = retry.operation({ + forever: true, + maxTimeout: 5000, + }); return new Promise((resolveRetry) => { operation.attempt(async () => { - this.client = new KafkaJS({ - ...this.config.kafka, - logCreator: () => { - return ({level, log}) => { - const {message, ...extra} = log; - this.logger.log(toWinstonLogLevel(level), '[kafka-client] ' + message, extra); - }; - }, - }); - - this.producer = this.client.producer(); - this.admin = this.client.admin(); - const timeout = this.config.timeout || 2000; - - this.producer.connect().catch(err => { - this.logger.warn('Producer connection error: ' + err); - }); - - // waiting for producer to be ready - await new Promise((resolveProducer, rejectProducer) => { - const timer = setTimeout(() => { - const err = 'Connection timeout: Kafka host is unreachable'; - this.logger.error(err, this.config.kafka.brokers); - rejectProducer(err); - }, timeout); - - this.producer.on('producer.connect', () => { - this.producerConnected = true; - this.logger.info('The Producer is ready.'); - clearTimeout(timer); - resolveProducer(true); + try { + this.client = new KafkaJS({ + retry: { + initialRetryTime: 1000, + maxRetryTime: 10000, + retries: 100, + }, + ...this.config.kafka, + logCreator: () => { + return ({level, log}) => { + const {message, ...extra} = log; + this.logger.log(toWinstonLogLevel(level), '[kafka-client] ' + message, extra); + }; + }, }); - this.producer.on('producer.disconnect', (err) => { - this.producerConnected = false; - this.logger.warn('The Producer has disconnected:', err); - clearTimeout(timer); - rejectProducer(err); + this.producer = this.client.producer(); + this.admin = this.client.admin(); + const timeout = this.config.timeout ?? 5000; + + // waiting for producer to be ready + await new Promise((resolveProducer, rejectProducer) => { + const timer = setTimeout(() => { + const err = 'Connection timeout: Kafka host is unreachable'; + this.logger.error(err, this.config.kafka.brokers); + rejectProducer(err); + }, timeout); + + this.producer.on('producer.connect', () => { + this.producerConnected = true; + this.logger.info('The Producer is ready.'); + clearTimeout(timer); + resolveProducer(true); + }); + + this.producer.on('producer.disconnect', (err) => { + this.producerConnected = false; + this.logger.warn('The Producer has disconnected:', err); + clearTimeout(timer); + rejectProducer(err); + }); + + this.producer.on('producer.network.request_timeout', (err) => { + this.logger.warn('The Producer timed out:', err); + clearTimeout(timer); + rejectProducer(err); + }); + + this.producer.connect().catch(err => { + this.logger.warn('Producer connection error:', err); + }); + }).then(async () => { + this.admin.on('admin.connect', () => { + this.adminConnected = true; + resolveRetry(); + }); + this.admin.on('admin.disconnect', () => this.adminConnected = false); + + await this.admin.connect().catch( + err => { + this.logger.warn('Admin connection error:', err); + throw err; + } + ); }); - - this.producer.on('producer.network.request_timeout', (err) => { - this.logger.warn('The Producer timed out:', err); - clearTimeout(timer); - rejectProducer(err); - }); - }).then(async () => { - this.admin.on('admin.connect', () => this.adminConnected = true); - this.admin.on('admin.disconnect', () => this.adminConnected = false); - - await this.admin.connect().catch(err => { - this.logger.warn('Admin connection error: ' + err); - throw err; - }); - }).then(resolveRetry).catch(err => { + } + catch (err) { const attemptNo = (operation.attempts as () => number)(); - this.producer.disconnect(); + this.producer?.disconnect(); this.logger.info(`Retry initialize the Producer, attempt no: ${attemptNo}`); operation.retry(err); - }); + } }); }); }