Skip to content

Commit cbb89dc

Browse files
committed
Updating Kafka consumer to avro specific deserializer.
1 parent 096579a commit cbb89dc

File tree

5 files changed

+28
-18
lines changed

5 files changed

+28
-18
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ services.
2424
- Create new service for GraphQL client using Apollo library
2525
- Set up performance test for K6 in local and docker env
2626
- Integration test with Karate and Wiremock
27-
-
27+
- Retry and DLQ set up
2828

2929
### To build a docker image run below command:
3030
```shell
@@ -42,4 +42,4 @@ Import the project in IntelliJ and run
4242
or
4343
```shell
4444
mvnw springboot:run
45-
```
45+
```

pom.xml

+7-7
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@
4444
<artifactId>spring-boot-starter-web</artifactId>
4545
</dependency>
4646

47-
<dependency>
48-
<groupId>org.springframework.boot</groupId>
49-
<artifactId>spring-boot-devtools</artifactId>
50-
<scope>runtime</scope>
51-
<optional>true</optional>
52-
</dependency>
47+
<!-- <dependency>-->
48+
<!-- <groupId>org.springframework.boot</groupId>-->
49+
<!-- <artifactId>spring-boot-devtools</artifactId>-->
50+
<!-- <scope>runtime</scope>-->
51+
<!-- <optional>true</optional>-->
52+
<!-- </dependency>-->
5353

5454
<dependency>
5555
<groupId>org.springframework.boot</groupId>
@@ -267,4 +267,4 @@
267267
<url>https://packages.confluent.io/maven/</url>
268268
</repository>
269269
</repositories>
270-
</project>
270+
</project>
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.sarya.graphql.service.application.port.in;
22

3+
import com.sarya.graphql.service.ProductCreatedEvent;
34
import lombok.extern.slf4j.Slf4j;
4-
import org.apache.avro.generic.GenericRecord;
55
import org.springframework.kafka.annotation.KafkaListener;
66
import org.springframework.stereotype.Component;
77

@@ -10,7 +10,7 @@
1010
public class ProductConsumer {
1111

1212
@KafkaListener(topics = {"create-product"}, groupId = "sample-service-group")
13-
public void consumeProducts(GenericRecord event) {
13+
public void consumeProducts(ProductCreatedEvent event) {
1414
log.info("message: {}", event);
1515
}
1616
}

src/main/java/com/sarya/graphql/service/infrastructure/config/KafkaConsumerConfig.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@
1717
@Slf4j
1818
public class KafkaConsumerConfig {
1919

20-
KafkaProperties kafkaProperties;
21-
2220
@Bean
2321
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
24-
ConcurrentKafkaListenerContainerFactory<String, ProductCreatedEvent> kafkaListenerContainerFactory() {
22+
ConcurrentKafkaListenerContainerFactory<String, ProductCreatedEvent> kafkaListenerContainerFactory(
23+
KafkaProperties kafkaProperties
24+
) {
2525
var consumerFactory = new DefaultKafkaConsumerFactory<String, ProductCreatedEvent>(
2626
kafkaProperties.buildConsumerProperties()
2727
);

src/main/resources/application.yml

+14-4
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,21 @@ spring:
1414
bootstrap-servers: localhost:9092
1515
group-id: sample-service-group
1616
auto-offset-reset: earliest
17-
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
18-
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
17+
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
18+
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
1919
properties:
2020
schema.registry.url: http://localhost:8081
21-
21+
specific:
22+
avro:
23+
reader: true
24+
spring:
25+
deserializer:
26+
key:
27+
delegate:
28+
class: org.apache.kafka.common.serialization.StringDeserializer
29+
value:
30+
delegate:
31+
class: io.confluent.kafka.serializers.KafkaAvroDeserializer
2232
dgs:
2333
reload: true
2434
graphql:
@@ -37,4 +47,4 @@ dgs:
3747

3848
#app:
3949
# topic:
40-
# product-topic: sample-topic
50+
# product-topic: sample-topic

0 commit comments

Comments
 (0)