diff --git a/packages/kafka-client/src/events/provider/kafka/index.ts b/packages/kafka-client/src/events/provider/kafka/index.ts index 209baa04..cb6419d9 100644 --- a/packages/kafka-client/src/events/provider/kafka/index.ts +++ b/packages/kafka-client/src/events/provider/kafka/index.ts @@ -528,68 +528,84 @@ export class Kafka { * Suspends the calling function until the producer is connected. */ async start(): Promise { - const operation = retry.operation({forever: true, maxTimeout: 2000}); + const operation = retry.operation({ + forever: true, + maxTimeout: 5000, + }); return new Promise((resolveRetry) => { operation.attempt(async () => { - this.client = new KafkaJS({ - ...this.config.kafka, - logCreator: () => { - return ({level, log}) => { - const {message, ...extra} = log; - this.logger.log(toWinstonLogLevel(level), '[kafka-client] ' + message, extra); - }; - }, - }); - - this.producer = this.client.producer(); - this.admin = this.client.admin(); - const timeout = this.config.timeout || 2000; - - this.producer.connect().catch(err => { - this.logger.warn('Producer connection error: ' + err); - }); - - // waiting for producer to be ready - await new Promise((resolveProducer, rejectProducer) => { - const timer = setTimeout(() => { - const err = 'Connection timeout: Kafka host is unreachable'; - this.logger.error(err, this.config.kafka.brokers); - rejectProducer(err); - }, timeout); - - this.producer.on('producer.connect', () => { - this.producerConnected = true; - this.logger.info('The Producer is ready.'); - clearTimeout(timer); - resolveProducer(true); + try { + this.client = new KafkaJS({ + retry: { + initialRetryTime: 1000, + maxRetryTime: 10000, + retries: 100, + }, + ...this.config.kafka, + logCreator: () => { + return ({level, log}) => { + const {message, ...extra} = log; + this.logger.log(toWinstonLogLevel(level), '[kafka-client] ' + message, extra); + }; + }, }); - this.producer.on('producer.disconnect', (err) => { - this.producerConnected = false; - this.logger.warn('The Producer has disconnected:', err); - clearTimeout(timer); - rejectProducer(err); + this.producer = this.client.producer(); + this.admin = this.client.admin(); + const timeout = this.config.timeout ?? 5000; + + // waiting for producer to be ready + await new Promise((resolveProducer, rejectProducer) => { + const timer = setTimeout(() => { + const err = 'Connection timeout: Kafka host is unreachable'; + this.logger.error(err, this.config.kafka.brokers); + rejectProducer(err); + }, timeout); + + this.producer.on('producer.connect', () => { + this.producerConnected = true; + this.logger.info('The Producer is ready.'); + clearTimeout(timer); + resolveProducer(true); + }); + + this.producer.on('producer.disconnect', (err) => { + this.producerConnected = false; + this.logger.warn('The Producer has disconnected:', err); + clearTimeout(timer); + rejectProducer(err); + }); + + this.producer.on('producer.network.request_timeout', (err) => { + this.logger.warn('The Producer timed out:', err); + clearTimeout(timer); + rejectProducer(err); + }); + + this.producer.connect().catch(err => { + this.logger.warn('Producer connection error:', err); + }); + }).then(async () => { + this.admin.on('admin.connect', () => { + this.adminConnected = true; + resolveRetry(); + }); + this.admin.on('admin.disconnect', () => this.adminConnected = false); + + await this.admin.connect().catch( + err => { + this.logger.warn('Admin connection error:', err); + throw err; + } + ); }); - - this.producer.on('producer.network.request_timeout', (err) => { - this.logger.warn('The Producer timed out:', err); - clearTimeout(timer); - rejectProducer(err); - }); - }).then(async () => { - this.admin.on('admin.connect', () => this.adminConnected = true); - this.admin.on('admin.disconnect', () => this.adminConnected = false); - - await this.admin.connect().catch(err => { - this.logger.warn('Admin connection error: ' + err); - throw err; - }); - }).then(resolveRetry).catch(err => { + } + catch (err) { const attemptNo = (operation.attempts as () => number)(); - this.producer.disconnect(); + this.producer?.disconnect(); this.logger.info(`Retry initialize the Producer, attempt no: ${attemptNo}`); operation.retry(err); - }); + } }); }); }