This is an example of a Spring Cloud Stream processor using Kafka Streams support.
This example is a Spring Cloud Stream adaptation of this Kafka Streams sample: https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/test/java/io/confluent/examples/streams/StreamToTableJoinIntegrationTest.java
The application uses two inputs - one KStream for person and a KStream for school. Then it joins the information from stream to stream
Go to the root of the repository.
docker-compose up -d
./mvnw clean package
java -jar target/kafka-streams-join-0.0.1-SNAPSHOT.jar
Reproducible Steps:
1) Run zookeeper and kafka broker standalone or from docker-compose
2) Create the following topics
.\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic person
.\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic school
3) Run the stand-alone Producer.java to generate messages for person and school topics
4) Optional - To observe the messages within topics
.\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic person --key-deserializer org.apache.kafka.common.serialization.StringDeserializer --value-deserializer org.apache.kafka.common.serialization.StringDeserializer --property print.key=true --property key.separator="-" --property print.timestamp=true
.\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic school --key-deserializer org.apache.kafka.common.serialization.StringDeserializer --value-deserializer org.apache.kafka.common.serialization.StringDeserializer --property print.key=true --property key.separator="-" --property print.timestamp=true
5) Run KafkaStreamsTableJoin.java as the main application from IDE or just run java -jar target/kafka-streams-join-0.0.1-SNAPSHOT.jar
6) Observe the result - The messages with composite-keys are never joined but the messages with simple primitive keys are joined.