Skip to content

Commit

Permalink
fix(kafka_client): catch KafkaJSNumberOfRetriesExceeded from new Kafk…
Browse files Browse the repository at this point in the history
…aJS()

new KafkaJS() has its own retry routine integrated and throws KafkaJSNumberOfRetriesExceeded when
only 5 retries of 300ms exceeded. This Exception was not catched and also canceled our wrapped
retry.attmept. Increasing the initialRetryTime and the number of retries could already solve the
issue. However, we wrap a forever retry for just in case.
  • Loading branch information
Gerald Baulig committed Sep 5, 2023
1 parent 8c2ed99 commit 7939cb2
Showing 1 changed file with 70 additions and 54 deletions.
124 changes: 70 additions & 54 deletions packages/kafka-client/src/events/provider/kafka/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -528,68 +528,84 @@ export class Kafka {
* Suspends the calling function until the producer is connected.
*/
async start(): Promise<void> {
const operation = retry.operation({forever: true, maxTimeout: 2000});
const operation = retry.operation({
forever: true,
maxTimeout: 5000,
});
return new Promise<void>((resolveRetry) => {
operation.attempt(async () => {
this.client = new KafkaJS({
...this.config.kafka,
logCreator: () => {
return ({level, log}) => {
const {message, ...extra} = log;
this.logger.log(toWinstonLogLevel(level), '[kafka-client] ' + message, extra);
};
},
});

this.producer = this.client.producer();
this.admin = this.client.admin();
const timeout = this.config.timeout || 2000;

this.producer.connect().catch(err => {
this.logger.warn('Producer connection error: ' + err);
});

// waiting for producer to be ready
await new Promise((resolveProducer, rejectProducer) => {
const timer = setTimeout(() => {
const err = 'Connection timeout: Kafka host is unreachable';
this.logger.error(err, this.config.kafka.brokers);
rejectProducer(err);
}, timeout);

this.producer.on('producer.connect', () => {
this.producerConnected = true;
this.logger.info('The Producer is ready.');
clearTimeout(timer);
resolveProducer(true);
try {
this.client = new KafkaJS({
retry: {
initialRetryTime: 1000,
maxRetryTime: 10000,
retries: 100,
},
...this.config.kafka,
logCreator: () => {
return ({level, log}) => {
const {message, ...extra} = log;
this.logger.log(toWinstonLogLevel(level), '[kafka-client] ' + message, extra);
};
},
});

this.producer.on('producer.disconnect', (err) => {
this.producerConnected = false;
this.logger.warn('The Producer has disconnected:', err);
clearTimeout(timer);
rejectProducer(err);
this.producer = this.client.producer();
this.admin = this.client.admin();
const timeout = this.config.timeout ?? 5000;

// waiting for producer to be ready
await new Promise((resolveProducer, rejectProducer) => {
const timer = setTimeout(() => {
const err = 'Connection timeout: Kafka host is unreachable';
this.logger.error(err, this.config.kafka.brokers);
rejectProducer(err);
}, timeout);

this.producer.on('producer.connect', () => {
this.producerConnected = true;
this.logger.info('The Producer is ready.');
clearTimeout(timer);
resolveProducer(true);
});

this.producer.on('producer.disconnect', (err) => {
this.producerConnected = false;
this.logger.warn('The Producer has disconnected:', err);
clearTimeout(timer);
rejectProducer(err);
});

this.producer.on('producer.network.request_timeout', (err) => {
this.logger.warn('The Producer timed out:', err);
clearTimeout(timer);
rejectProducer(err);
});

this.producer.connect().catch(err => {
this.logger.warn('Producer connection error:', err);
});
}).then(async () => {
this.admin.on('admin.connect', () => {
this.adminConnected = true;
resolveRetry();
});
this.admin.on('admin.disconnect', () => this.adminConnected = false);

await this.admin.connect().catch(
err => {
this.logger.warn('Admin connection error:', err);
throw err;
}
);
});

this.producer.on('producer.network.request_timeout', (err) => {
this.logger.warn('The Producer timed out:', err);
clearTimeout(timer);
rejectProducer(err);
});
}).then(async () => {
this.admin.on('admin.connect', () => this.adminConnected = true);
this.admin.on('admin.disconnect', () => this.adminConnected = false);

await this.admin.connect().catch(err => {
this.logger.warn('Admin connection error: ' + err);
throw err;
});
}).then(resolveRetry).catch(err => {
}
catch (err) {
const attemptNo = (operation.attempts as () => number)();
this.producer.disconnect();
this.producer?.disconnect();
this.logger.info(`Retry initialize the Producer, attempt no: ${attemptNo}`);
operation.retry(err);
});
}
});
});
}
Expand Down

0 comments on commit 7939cb2

Please sign in to comment.