Skip to content

Commit

Permalink
fix(kafka-client): Add support for manual commit
Browse files Browse the repository at this point in the history
  • Loading branch information
Arun-KumarH committed Apr 15, 2024
2 parents 4735c1c + 0f39d96 commit 7acd18f
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 16 deletions.
4 changes: 2 additions & 2 deletions packages/kafka-client/src/events/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export class Events {
* @param {string} name Topic name
* @return {Topic} Topic
*/
topic(name: string): Promise<Topic> {
topic(name: string, manualOffsetCommit?: boolean): Promise<Topic> {
if (_.isNil(name)) {
throw new Error('missing argument name');
}
Expand All @@ -107,6 +107,6 @@ export class Events {
}
// topic() api called inside Local / Kafka class - which then
// invokes the actual topic constructor
return this.provider.topic(name, this.config);
return this.provider.topic(name, this.config, manualOffsetCommit || false);
}
}
47 changes: 33 additions & 14 deletions packages/kafka-client/src/events/provider/kafka/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export class Topic {
drainEvent: (context: MessageWithContext, done: (err) => void) => void;
// default process one message at at time
asyncLimit = 1;
manualOffsetCommit: boolean;

/**
* Kafka topic.
Expand All @@ -53,14 +54,15 @@ export class Topic {
* @param provider
* @param config
*/
constructor(name: string, provider: Kafka, config: any) {
constructor(name: string, provider: Kafka, config: any, manualOffsetCommit = false) {
this.name = name;
this.emitter = new EventEmitter();
this.provider = provider;
this.subscribed = [];
this.waitQueue = [];
this.currentOffset = 0;
this.config = config;
this.manualOffsetCommit = manualOffsetCommit;
}

async createIfNotExists(): Promise<void> {
Expand Down Expand Up @@ -392,19 +394,22 @@ export class Topic {
}

private commit(): any {
this.commitCurrentOffsets().then(() => {
this.provider.logger.verbose('Offsets committed successfully');
}).catch(error => {
this.provider.logger.warn('Failed to commit offsets, resuming anyway after:', error);
// Fix for kafkaJS onCrash issue for KafkaJSNonRetriableError, to reset the consumers
this.provider.logger.warn('Commit error name', { name: error.name });
this.provider.logger.warn('Commit error message', { message: error.message });
if ((error.name === 'KafkaJSNonRetriableError' || error.name === 'KafkaJSError') && error.message === 'The coordinator is not aware of this member') {
this.provider.logger.info('Reset Consumer connection due to KafkaJSNonRetriableError');
this.$resetConsumer(this.subscribed, this.currentOffset);
this.provider.logger.info('Consumer connection reset successfully');
}
});
// Check if manual offset commit is enabled
if (!this.manualOffsetCommit) {
this.commitCurrentOffsets().then(() => {
this.provider.logger.verbose('Offsets committed successfully');
}).catch(error => {
this.provider.logger.warn('Failed to commit offsets, resuming anyway after:', error);
// Fix for kafkaJS onCrash issue for KafkaJSNonRetriableError, to reset the consumers
this.provider.logger.warn('Commit error name', { name: error.name });
this.provider.logger.warn('Commit error message', { message: error.message });
if ((error.name === 'KafkaJSNonRetriableError' || error.name === 'KafkaJSError') && error.message === 'The coordinator is not aware of this member') {
this.provider.logger.info('Reset Consumer connection due to KafkaJSNonRetriableError');
this.$resetConsumer(this.subscribed, this.currentOffset);
this.provider.logger.info('Consumer connection reset successfully');
}
});
}
}

async commitCurrentOffsets(): Promise<void> {
Expand All @@ -422,6 +427,20 @@ export class Topic {
});
}

/**
* Manually commit the current offset.
*/
async commitOffset(): Promise<void> {
try {
// Commit the current offset
await this.commitCurrentOffsets();
this.provider.logger.verbose('Offset committed manually');
} catch (error) {
this.provider.logger.error('Failed to commit offset manually', { code: error.code, message: error.message, stack: error.stack });
throw error;
}
}

/**
* Internal function for receiving event messages from Kafka and
* forwarding them to local listeners.
Expand Down
36 changes: 36 additions & 0 deletions packages/kafka-client/test/kafka.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ describe('Kafka provider test', () => {
const client: Events = new Events(kafkaConfig.events.kafka, logger);
const topicName = 'com.example.test';
const eventName = 'exampleEvent';
let initialOffset: number;
before(async () => {
// start the client
await client.start();
initialOffset = await (await client.topic(topicName)).$offset(-1);
});
after(async function() {
// stop the client
Expand Down Expand Up @@ -113,4 +115,38 @@ describe('Kafka provider test', () => {
countArr.length.should.equal(5);
});
});

describe('Manual Commit', () => {
it('should manually commit offset after processing message', async () => {
// Create topic object
const topic: Topic = await client.topic(topicName);
let offset: number;

// Subscribe to topic for example-event with listener as callback.
await topic.on(eventName, async (message, context) => {
// Ensure that message is processed
should.exist(message);
// Simulate processing time
await new Promise(resolve => setTimeout(resolve, 1000));

// Manually commit offset after processing the message
await topic.commitCurrentOffsets();
});

// Get the current offset
offset = await topic.$offset(-1);

// Emit the message to Kafka
await topic.emit(eventName, { value: 'value', count: 1 });

// Wait for processing to complete
await new Promise(resolve => setTimeout(resolve, 2000));

// Get the latest offset after processing
const finalOffset = await topic.$offset(-1);

// Verify that offset has been manually committed and updated accordingly
should(finalOffset).be.above(initialOffset);
});
});
});

0 comments on commit 7acd18f

Please sign in to comment.