Skip to content

Commit 4d4e64b

Browse files
committed
Revert "Initial/partial Kafka migration"
This reverts commit 767fc86.
1 parent 767fc86 commit 4d4e64b

File tree

9 files changed

+49
-59
lines changed

9 files changed

+49
-59
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/InteractiveQueryServiceMultiStateStoreTests.java

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.kafka.streams.state.StoreBuilder;
3232
import org.apache.kafka.streams.state.Stores;
3333
import org.junit.jupiter.api.BeforeAll;
34-
import org.junit.jupiter.api.Disabled;
3534
import org.junit.jupiter.api.Test;
3635
import org.mockito.Mockito;
3736
import org.slf4j.Logger;
@@ -62,7 +61,6 @@
6261
* @author Soby Chacko
6362
*/
6463
@EmbeddedKafka(topics = {"input1", "input2"})
65-
@Disabled
6664
class InteractiveQueryServiceMultiStateStoreTests {
6765

6866
private static final String STORE_1_NAME = "store1";
@@ -202,25 +200,25 @@ public StoreBuilder<KeyValueStore<String, String>> store1() {
202200
Stores.persistentKeyValueStore(STORE_1_NAME), Serdes.String(), Serdes.String());
203201
}
204202

205-
// @Bean
206-
// public Consumer<KStream<String, String>> app1() {
207-
// return s -> s
208-
// .transformValues(EchoTransformer::new, STORE_1_NAME)
209-
// .foreach((k, v) -> log.info("Echo {} -> {} into {}", k, v, STORE_1_NAME));
210-
// }
203+
@Bean
204+
public Consumer<KStream<String, String>> app1() {
205+
return s -> s
206+
.transformValues(EchoTransformer::new, STORE_1_NAME)
207+
.foreach((k, v) -> log.info("Echo {} -> {} into {}", k, v, STORE_1_NAME));
208+
}
211209

212210
@Bean
213211
public StoreBuilder<KeyValueStore<String, String>> store2() {
214212
return Stores.keyValueStoreBuilder(
215213
Stores.persistentKeyValueStore(STORE_2_NAME), Serdes.String(), Serdes.String());
216214
}
217215

218-
// @Bean
219-
// public Consumer<KStream<String, String>> app2() {
220-
// return s -> s
221-
// .transformValues(EchoTransformer::new, STORE_2_NAME)
222-
// .foreach((k, v) -> log.info("Echo {} -> {} into {}", k, v, STORE_2_NAME));
223-
// }
216+
@Bean
217+
public Consumer<KStream<String, String>> app2() {
218+
return s -> s
219+
.transformValues(EchoTransformer::new, STORE_2_NAME)
220+
.foreach((k, v) -> log.info("Echo {} -> {} into {}", k, v, STORE_2_NAME));
221+
}
224222

225223
@Bean
226224
public CleanupConfig cleanupConfig() {
@@ -285,12 +283,12 @@ public StoreBuilder<KeyValueStore<String, String>> store1() {
285283
Stores.persistentKeyValueStore(STORE_1_NAME), Serdes.String(), Serdes.String());
286284
}
287285

288-
// @Bean
289-
// public Consumer<KStream<String, String>> app1() {
290-
// return s -> s
291-
// .transformValues(EchoTransformer::new, STORE_1_NAME)
292-
// .foreach((k, v) -> log.info("Echo {} -> {} into {}", k, v, STORE_1_NAME));
293-
// }
286+
@Bean
287+
public Consumer<KStream<String, String>> app1() {
288+
return s -> s
289+
.transformValues(EchoTransformer::new, STORE_1_NAME)
290+
.foreach((k, v) -> log.info("Echo {} -> {} into {}", k, v, STORE_1_NAME));
291+
}
294292

295293
@Bean
296294
public CleanupConfig cleanupConfig() {

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/MultipleFunctionsInSameAppTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,9 @@ public static class MultipleFunctionsInSameApp {
218218

219219
@Bean
220220
public Function<KStream<String, String>, KStream<String, String>[]> processItem() {
221-
return input -> input.split().branch(
222-
(s, p) -> p.equalsIgnoreCase("coffee");
223-
// (s, p) -> p.equalsIgnoreCase("electronics"));
221+
return input -> input.branch(
222+
(s, p) -> p.equalsIgnoreCase("coffee"),
223+
(s, p) -> p.equalsIgnoreCase("electronics"));
224224
}
225225

226226
// Testing for the scenario under https://github.com/spring-cloud/spring-cloud-stream/issues/2817

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountBranchesFunctionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
193193
final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
194194
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.ROOT).split("\\W+")))
195195
.groupBy((key, value) -> value)
196-
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(5)))
196+
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
197197
.count(Materialized.as("WordCounts-branch"))
198198
.toStream()
199199
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/function/KafkaStreamsBinderWordCountFunctionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ Function<KStream<Object, String>, KStream<String, WordCount>> process() {
401401
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.ROOT).split("\\W+")))
402402
.map((key, value) -> new KeyValue<>(value, value))
403403
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
404-
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(5000)))
404+
.windowedBy(TimeWindows.of(Duration.ofMillis(5000)))
405405
.count(Materialized.as("foo-WordCounts"))
406406
.toStream()
407407
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsNativeEncodingDecodingDisabledTests.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.kafka.streams.kstream.TimeWindows;
3737
import org.junit.AfterClass;
3838
import org.junit.BeforeClass;
39+
import org.junit.ClassRule;
3940
import org.junit.Test;
4041
import org.junit.runner.RunWith;
4142

@@ -47,7 +48,7 @@
4748
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4849
import org.springframework.kafka.core.KafkaTemplate;
4950
import org.springframework.kafka.test.EmbeddedKafkaBroker;
50-
import org.springframework.kafka.test.context.EmbeddedKafka;
51+
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
5152
import org.springframework.kafka.test.utils.KafkaTestUtils;
5253
import org.springframework.messaging.Message;
5354
import org.springframework.messaging.support.MessageBuilder;
@@ -65,20 +66,17 @@
6566
@RunWith(SpringRunner.class)
6667
@ContextConfiguration
6768
@DirtiesContext
68-
@EmbeddedKafka(topics = {"decode-counts", "decode-counts-1"})
6969
public abstract class KafkaStreamsNativeEncodingDecodingDisabledTests {
7070

71-
// /**
72-
// * Kafka rule.
73-
// */
74-
// @ClassRule
75-
// public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
76-
// "decode-counts", "decode-counts-1");
77-
//
78-
// private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule
79-
// .getEmbeddedKafka();
80-
81-
private static EmbeddedKafkaBroker embeddedKafka;
71+
/**
72+
* Kafka rule.
73+
*/
74+
@ClassRule
75+
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
76+
"decode-counts", "decode-counts-1");
77+
78+
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule
79+
.getEmbeddedKafka();
8280

8381
@SpyBean
8482
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate conversionDelegate;

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsNativeEncodingDecodingEnabledTests.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.kafka.streams.kstream.TimeWindows;
3434
import org.junit.AfterClass;
3535
import org.junit.BeforeClass;
36+
import org.junit.ClassRule;
3637
import org.junit.Test;
3738
import org.junit.runner.RunWith;
3839

@@ -44,7 +45,7 @@
4445
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4546
import org.springframework.kafka.core.KafkaTemplate;
4647
import org.springframework.kafka.test.EmbeddedKafkaBroker;
47-
import org.springframework.kafka.test.context.EmbeddedKafka;
48+
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
4849
import org.springframework.kafka.test.utils.KafkaTestUtils;
4950
import org.springframework.test.annotation.DirtiesContext;
5051
import org.springframework.test.context.ContextConfiguration;
@@ -61,19 +62,17 @@
6162
@RunWith(SpringRunner.class)
6263
@ContextConfiguration
6364
@DirtiesContext
64-
@EmbeddedKafka(topics = {"decode-counts", "decode-counts-1"})
6565
public abstract class KafkaStreamsNativeEncodingDecodingEnabledTests {
6666

67-
// /**
68-
// * Kafka rule.
69-
// */
70-
// @ClassRule
71-
// public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
72-
// "decode-counts", "decode-counts-1");
67+
/**
68+
* Kafka rule.
69+
*/
70+
@ClassRule
71+
public static EmbeddedKafkaRule embeddedKafkaRule = new EmbeddedKafkaRule(1, true,
72+
"decode-counts", "decode-counts-1");
7373

74-
private static EmbeddedKafkaBroker embeddedKafka;
75-
// = embeddedKafkaRule
76-
// .getEmbeddedKafka();
74+
private static EmbeddedKafkaBroker embeddedKafka = embeddedKafkaRule
75+
.getEmbeddedKafka();
7776

7877
@SpyBean
7978
org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate conversionDelegate;

binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/integration/KafkaStreamsStateStoreIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323

2424
import org.apache.kafka.common.serialization.Serdes;
2525
import org.apache.kafka.streams.kstream.KStream;
26+
import org.apache.kafka.streams.processor.Processor;
2627
import org.apache.kafka.streams.processor.ProcessorContext;
27-
import org.apache.kafka.streams.processor.api.Processor;
2828
import org.apache.kafka.streams.state.StoreBuilder;
2929
import org.apache.kafka.streams.state.Stores;
3030
import org.apache.kafka.streams.state.WindowStore;

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderMetricsTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,9 @@ void shouldNotContainAnyMetricsWhenUsingNoopGauge() {
163163
MeterFilter.denyNameStartsWith("spring.cloud.stream.binder.kafka.offset"));
164164

165165
// Because we have NoopGauge for the offset metric in the meter registry, none of these expectations matter.
166-
// org.mockito.BDDMockito
167-
// .given(consumer.committed(ArgumentMatchers.anySet(TopicPartition.class)))
168-
// .willReturn(new OffsetAndMetadata(500));
166+
org.mockito.BDDMockito
167+
.given(consumer.committed(ArgumentMatchers.any(TopicPartition.class)))
168+
.willReturn(new OffsetAndMetadata(500));
169169
List<PartitionInfo> partitions = partitions(new Node(0, null, 0));
170170
topicsInUse.put(
171171
TEST_TOPIC,

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@
7272
import org.awaitility.Awaitility;
7373
import org.junit.jupiter.api.BeforeAll;
7474
import org.junit.jupiter.api.BeforeEach;
75-
import org.junit.jupiter.api.Disabled;
7675
import org.junit.jupiter.api.Test;
7776
import org.junit.jupiter.api.TestInfo;
7877
import org.mockito.ArgumentMatchers;
@@ -711,7 +710,6 @@ public void testSendAndReceive(TestInfo testInfo) throws Exception {
711710
}
712711

713712
@Test
714-
@Disabled
715713
@SuppressWarnings({ "unchecked", "rawtypes" })
716714
void testSendAndReceiveBatch() throws Exception {
717715
Binder binder = getBinder();
@@ -1628,7 +1626,6 @@ void autoCommitOnErrorWhenManualAcknowledgement() throws Exception {
16281626
}
16291627

16301628
@Test
1631-
@Disabled
16321629
@SuppressWarnings("unchecked")
16331630
void configurableDlqName() throws Exception {
16341631
Binder binder = getBinder();
@@ -2275,7 +2272,6 @@ void partitionedModuleJava() throws Exception {
22752272

22762273
@Test
22772274
@Override
2278-
@Disabled
22792275
@SuppressWarnings("unchecked")
22802276
public void testAnonymousGroup(TestInfo testInfo) throws Exception {
22812277
Binder binder = getBinder();
@@ -2772,7 +2768,6 @@ void dynamicKeyExpression() throws Exception {
27722768
}
27732769

27742770
@Test
2775-
@Disabled
27762771
@SuppressWarnings("unchecked")
27772772
void customPartitionCountOverridesPartitioningIfLarger() throws Exception {
27782773
var testPayload = new byte[2048];
@@ -3558,7 +3553,7 @@ void testSendAndReceiveWithMixedMode() throws Exception {
35583553
Consumer consumer = cf.createConsumer();
35593554
consumer.subscribe(Collections.singletonList("mixed.0"));
35603555

3561-
ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
3556+
ConsumerRecords records = consumer.poll(10_1000);
35623557
Iterator<ConsumerRecord> iterator = records.iterator();
35633558
ConsumerRecord record = iterator.next();
35643559
byte[] value = (byte[]) record.value();

0 commit comments

Comments
 (0)