Skip to content

Commit 84565da

Browse files
authored
Use Map instead of Properties (#91)
1 parent f6cb911 commit 84565da

File tree

32 files changed

+550
-285
lines changed

32 files changed

+550
-285
lines changed

fluent-kafka-streams-tests-junit4/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit4/TestTopologyRule.java

+14-14
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* MIT License
33
*
4-
* Copyright (c) 2023 bakdata
4+
* Copyright (c) 2024 bakdata
55
*
66
* Permission is hereby granted, free of charge, to any person obtaining a copy
77
* of this software and associated documentation files (the "Software"), to deal
@@ -27,7 +27,6 @@
2727
import com.bakdata.fluent_kafka_streams_tests.TestTopology;
2828
import com.bakdata.schemaregistrymock.SchemaRegistryMock;
2929
import java.util.Map;
30-
import java.util.Properties;
3130
import java.util.function.Function;
3231
import java.util.function.Supplier;
3332
import lombok.Getter;
@@ -72,39 +71,39 @@
7271
public class TestTopologyRule<DefaultK, DefaultV> extends TestTopology<DefaultK, DefaultV>
7372
implements TestRule {
7473
public TestTopologyRule(
75-
final Function<? super Properties, ? extends Topology> topologyFactory,
76-
final Function<? super String, ? extends Map<?, ?>> propertiesFactory) {
74+
final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
75+
final Function<? super String, ? extends Map<String, ?>> propertiesFactory) {
7776
super(topologyFactory, propertiesFactory);
7877
}
7978
public TestTopologyRule(
80-
final Function<? super Properties, ? extends Topology> topologyFactory,
81-
final Map<Object, Object> properties) {
79+
final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
80+
final Map<String, Object> properties) {
8281
super(topologyFactory, properties);
8382
}
8483

8584
public TestTopologyRule(
8685
final Supplier<? extends Topology> topologyFactory,
87-
final Function<? super String, ? extends Map<?, ?>> propertiesFactory) {
86+
final Function<? super String, ? extends Map<String, ?>> propertiesFactory) {
8887
super(topologyFactory, propertiesFactory);
8988
}
9089

9190
public TestTopologyRule(
92-
final Supplier<? extends Topology> topologyFactory, final Map<Object, Object> properties) {
91+
final Supplier<? extends Topology> topologyFactory, final Map<String, Object> properties) {
9392
super(topologyFactory, properties);
9493
}
9594

9695
public TestTopologyRule(final Topology topology,
97-
final Function<? super String, ? extends Map<?, ?>> propertiesFactory) {
96+
final Function<? super String, ? extends Map<String, ?>> propertiesFactory) {
9897
super(topology, propertiesFactory);
9998
}
10099

101-
public TestTopologyRule(final Topology topology, final Map<Object, Object> properties) {
100+
public TestTopologyRule(final Topology topology, final Map<String, Object> properties) {
102101
super(topology, properties);
103102
}
104103

105104
protected TestTopologyRule(
106-
final Function<? super Properties, ? extends Topology> topologyFactory,
107-
final Function<? super String, ? extends Map<?, ?>> propertiesFactory,
105+
final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
106+
final Function<? super String, ? extends Map<String, ?>> propertiesFactory,
108107
final Serde<DefaultK> defaultKeySerde, final Serde<DefaultV> defaultValueSerde,
109108
final SchemaRegistryMock schemaRegistryMock) {
110109
super(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, schemaRegistryMock);
@@ -126,8 +125,9 @@ public void evaluate() throws Throwable {
126125
}
127126

128127
@Override
129-
protected <K, V> TestTopologyRule<K, V> with(final Function<? super Properties, ? extends Topology> topologyFactory,
130-
final Function<? super String, ? extends Map<?, ?>> propertiesFactory, final Serde<K> defaultKeySerde,
128+
protected <K, V> TestTopologyRule<K, V> with(
129+
final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
130+
final Function<? super String, ? extends Map<String, ?>> propertiesFactory, final Serde<K> defaultKeySerde,
131131
final Serde<V> defaultValueSerde,
132132
final SchemaRegistryMock schemaRegistryMock) {
133133
return new TestTopologyRule<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde,

fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class WordCountTest {
4141

4242
@Rule
4343
public final TestTopologyRule<Object, String> testTopology =
44-
new TestTopologyRule<>(this.app::getTopology, this.app.getKafkaProperties());
44+
new TestTopologyRule<>(this.app::getTopology, WordCount.getKafkaProperties());
4545

4646
@Test
4747
public void shouldAggregateSameWordStream() {

fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWithStaticTopologyTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* MIT License
33
*
4-
* Copyright (c) 2019 bakdata GmbH
4+
* Copyright (c) 2024 bakdata
55
*
66
* Permission is hereby granted, free of charge, to any person obtaining a copy
77
* of this software and associated documentation files (the "Software"), to deal
@@ -34,7 +34,7 @@ public class WordCountWithStaticTopologyTest {
3434

3535
@Rule
3636
public final TestTopologyRule<Object, String> testTopology = new TestTopologyRule<>(this.app.getTopology(),
37-
this.app.getKafkaProperties());
37+
WordCount.getKafkaProperties());
3838

3939
@Test
4040
public void shouldAggregateSameWordStream() {

fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/WordCountWitherTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* MIT License
33
*
4-
* Copyright (c) 2019 bakdata GmbH
4+
* Copyright (c) 2024 bakdata
55
*
66
* Permission is hereby granted, free of charge, to any person obtaining a copy
77
* of this software and associated documentation files (the "Software"), to deal
@@ -37,7 +37,7 @@ public class WordCountWitherTest {
3737

3838
@Rule
3939
public final TestTopologyRule<Object, String> testTopology =
40-
new TestTopologyRule<>(this.app.getTopology(), this.app.getKafkaProperties())
40+
new TestTopologyRule<>(this.app.getTopology(), WordCount.getKafkaProperties())
4141
.withDefaultValueSerde(Serdes.String())
4242
.withSchemaRegistryMock(new SchemaRegistryMock(List.of(new AvroSchemaProvider())));
4343

fluent-kafka-streams-tests-junit4/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit4/test_applications/WordCount.java

+34-17
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,57 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2024 bakdata
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
125
package com.bakdata.fluent_kafka_streams_tests.junit4.test_applications;
226

327
import java.util.Arrays;
4-
import java.util.Properties;
28+
import java.util.HashMap;
29+
import java.util.Map;
530
import java.util.regex.Pattern;
631
import lombok.Getter;
732
import org.apache.kafka.common.serialization.Serde;
833
import org.apache.kafka.common.serialization.Serdes;
9-
import org.apache.kafka.streams.KafkaStreams;
34+
import org.apache.kafka.common.serialization.Serdes.StringSerde;
1035
import org.apache.kafka.streams.StreamsBuilder;
1136
import org.apache.kafka.streams.StreamsConfig;
1237
import org.apache.kafka.streams.Topology;
1338
import org.apache.kafka.streams.kstream.KStream;
1439
import org.apache.kafka.streams.kstream.KTable;
1540
import org.apache.kafka.streams.kstream.Produced;
1641

42+
@Getter
1743
public class WordCount {
18-
@Getter
1944
private final String inputTopic = "wordcount-input";
2045

21-
@Getter
2246
private final String outputTopic = "wordcount-output";
2347

24-
public static void main(final String[] args) {
25-
final WordCount wordCount = new WordCount();
26-
final KafkaStreams streams = new KafkaStreams(wordCount.getTopology(), wordCount.getKafkaProperties());
27-
streams.start();
28-
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
29-
}
30-
31-
public Properties getKafkaProperties() {
48+
public static Map<String, Object> getKafkaProperties() {
3249
final String brokers = "localhost:9092";
33-
final Properties kafkaConfig = new Properties();
34-
kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");
35-
kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
36-
kafkaConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
37-
kafkaConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
50+
final Map<String, Object> kafkaConfig = new HashMap<>();
51+
kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");
52+
kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
53+
kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class);
54+
kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class);
3855
return kafkaConfig;
3956
}
4057

fluent-kafka-streams-tests-junit5/src/main/java/com/bakdata/fluent_kafka_streams_tests/junit5/TestTopologyExtension.java

+14-14
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* MIT License
33
*
4-
* Copyright (c) 2023 bakdata
4+
* Copyright (c) 2024 bakdata
55
*
66
* Permission is hereby granted, free of charge, to any person obtaining a copy
77
* of this software and associated documentation files (the "Software"), to deal
@@ -27,7 +27,6 @@
2727
import com.bakdata.fluent_kafka_streams_tests.TestTopology;
2828
import com.bakdata.schemaregistrymock.SchemaRegistryMock;
2929
import java.util.Map;
30-
import java.util.Properties;
3130
import java.util.function.Function;
3231
import java.util.function.Supplier;
3332
import lombok.Getter;
@@ -72,40 +71,40 @@
7271
public class TestTopologyExtension<DefaultK, DefaultV> extends TestTopology<DefaultK, DefaultV>
7372
implements BeforeEachCallback, AfterEachCallback {
7473
public TestTopologyExtension(
75-
final Function<? super Properties, ? extends Topology> topologyFactory,
76-
final Function<? super String, ? extends Map<?, ?>> propertiesFactory) {
74+
final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
75+
final Function<? super String, ? extends Map<String, ?>> propertiesFactory) {
7776
super(topologyFactory, propertiesFactory);
7877
}
7978

8079
public TestTopologyExtension(
81-
final Function<? super Properties, ? extends Topology> topologyFactory,
82-
final Map<Object, Object> properties) {
80+
final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
81+
final Map<String, Object> properties) {
8382
super(topologyFactory, properties);
8483
}
8584

8685
public TestTopologyExtension(
8786
final Supplier<? extends Topology> topologyFactory,
88-
final Function<? super String, ? extends Map<?, ?>> propertiesFactory) {
87+
final Function<? super String, ? extends Map<String, ?>> propertiesFactory) {
8988
super(topologyFactory, propertiesFactory);
9089
}
9190

9291
public TestTopologyExtension(
93-
final Supplier<? extends Topology> topologyFactory, final Map<Object, Object> properties) {
92+
final Supplier<? extends Topology> topologyFactory, final Map<String, Object> properties) {
9493
super(topologyFactory, properties);
9594
}
9695

9796
public TestTopologyExtension(final Topology topology,
98-
final Function<? super String, ? extends Map<?, ?>> propertiesFactory) {
97+
final Function<? super String, ? extends Map<String, ?>> propertiesFactory) {
9998
super(topology, propertiesFactory);
10099
}
101100

102-
public TestTopologyExtension(final Topology topology, final Map<Object, Object> properties) {
101+
public TestTopologyExtension(final Topology topology, final Map<String, Object> properties) {
103102
super(topology, properties);
104103
}
105104

106105
protected TestTopologyExtension(
107-
final Function<? super Properties, ? extends Topology> topologyFactory,
108-
final Function<? super String, ? extends Map<?, ?>> propertiesFactory,
106+
final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
107+
final Function<? super String, ? extends Map<String, ?>> propertiesFactory,
109108
final Serde<DefaultK> defaultKeySerde, final Serde<DefaultV> defaultValueSerde,
110109
final SchemaRegistryMock schemaRegistryMock) {
111110
super(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde, schemaRegistryMock);
@@ -122,8 +121,9 @@ public void beforeEach(final ExtensionContext context) {
122121
}
123122

124123
@Override
125-
protected <K, V> TestTopology<K, V> with(final Function<? super Properties, ? extends Topology> topologyFactory,
126-
final Function<? super String, ? extends Map<?, ?>> propertiesFactory, final Serde<K> defaultKeySerde,
124+
protected <K, V> TestTopology<K, V> with(
125+
final Function<? super Map<String, Object>, ? extends Topology> topologyFactory,
126+
final Function<? super String, ? extends Map<String, ?>> propertiesFactory, final Serde<K> defaultKeySerde,
127127
final Serde<V> defaultValueSerde,
128128
final SchemaRegistryMock schemaRegistry) {
129129
return new TestTopologyExtension<>(topologyFactory, propertiesFactory, defaultKeySerde, defaultValueSerde,

fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class WordCountTest {
4141

4242
@RegisterExtension
4343
final TestTopologyExtension<Object, String> testTopology = new TestTopologyExtension<>(this.app::getTopology,
44-
this.app.getKafkaProperties());
44+
WordCount.getKafkaProperties());
4545

4646
@Test
4747
void shouldAggregateSameWordStream() {

fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWithStaticTopologyTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* MIT License
33
*
4-
* Copyright (c) 2019 bakdata GmbH
4+
* Copyright (c) 2024 bakdata
55
*
66
* Permission is hereby granted, free of charge, to any person obtaining a copy
77
* of this software and associated documentation files (the "Software"), to deal
@@ -34,7 +34,7 @@ class WordCountWithStaticTopologyTest {
3434

3535
@RegisterExtension
3636
final TestTopologyExtension<Object, String> testTopology = new TestTopologyExtension<>(this.app.getTopology(),
37-
this.app.getKafkaProperties());
37+
WordCount.getKafkaProperties());
3838

3939
@Test
4040
void shouldAggregateSameWordStream() {

fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/WordCountWitherTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*
22
* MIT License
33
*
4-
* Copyright (c) 2019 bakdata GmbH
4+
* Copyright (c) 2024 bakdata
55
*
66
* Permission is hereby granted, free of charge, to any person obtaining a copy
77
* of this software and associated documentation files (the "Software"), to deal
@@ -37,7 +37,7 @@ class WordCountWitherTest {
3737

3838
@RegisterExtension
3939
final TestTopologyExtension<Object, String> testTopology =
40-
new TestTopologyExtension<>(this.app::getTopology, this.app.getKafkaProperties())
40+
new TestTopologyExtension<>(this.app::getTopology, WordCount.getKafkaProperties())
4141
.withDefaultValueSerde(Serdes.String())
4242
.withSchemaRegistryMock(new SchemaRegistryMock(List.of(new AvroSchemaProvider())));
4343

fluent-kafka-streams-tests-junit5/src/test/java/com/bakdata/fluent_kafka_streams_tests/junit5/test_applications/WordCount.java

+34-17
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,57 @@
1+
/*
2+
* MIT License
3+
*
4+
* Copyright (c) 2024 bakdata
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
*/
24+
125
package com.bakdata.fluent_kafka_streams_tests.junit5.test_applications;
226

327
import java.util.Arrays;
4-
import java.util.Properties;
28+
import java.util.HashMap;
29+
import java.util.Map;
530
import java.util.regex.Pattern;
631
import lombok.Getter;
732
import org.apache.kafka.common.serialization.Serde;
833
import org.apache.kafka.common.serialization.Serdes;
9-
import org.apache.kafka.streams.KafkaStreams;
34+
import org.apache.kafka.common.serialization.Serdes.StringSerde;
1035
import org.apache.kafka.streams.StreamsBuilder;
1136
import org.apache.kafka.streams.StreamsConfig;
1237
import org.apache.kafka.streams.Topology;
1338
import org.apache.kafka.streams.kstream.KStream;
1439
import org.apache.kafka.streams.kstream.KTable;
1540
import org.apache.kafka.streams.kstream.Produced;
1641

42+
@Getter
1743
public class WordCount {
18-
@Getter
1944
private final String inputTopic = "wordcount-input";
2045

21-
@Getter
2246
private final String outputTopic = "wordcount-output";
2347

24-
public static void main(final String[] args) {
25-
final WordCount wordCount = new WordCount();
26-
final KafkaStreams streams = new KafkaStreams(wordCount.getTopology(), wordCount.getKafkaProperties());
27-
streams.start();
28-
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
29-
}
30-
31-
public Properties getKafkaProperties() {
48+
public static Map<String, Object> getKafkaProperties() {
3249
final String brokers = "localhost:9092";
33-
final Properties kafkaConfig = new Properties();
34-
kafkaConfig.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");
35-
kafkaConfig.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
36-
kafkaConfig.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
37-
kafkaConfig.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
50+
final Map<String, Object> kafkaConfig = new HashMap<>();
51+
kafkaConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount");
52+
kafkaConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
53+
kafkaConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringSerde.class);
54+
kafkaConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, StringSerde.class);
3855
return kafkaConfig;
3956
}
4057

0 commit comments

Comments
 (0)