diff --git a/kafka-streams-utils/build.gradle.kts b/kafka-streams-utils/build.gradle.kts index 2fa8d41..52249d4 100644 --- a/kafka-streams-utils/build.gradle.kts +++ b/kafka-streams-utils/build.gradle.kts @@ -17,4 +17,5 @@ dependencies { testImplementation(libs.mockito.core) testImplementation(libs.mockito.junit) testImplementation(libs.logcaptor) + testImplementation(group = "org.apache.kafka", name = "kafka-streams-test-utils") } diff --git a/kafka-streams-utils/src/main/java/com/bakdata/kafka/streams/kstream/StoresX.java b/kafka-streams-utils/src/main/java/com/bakdata/kafka/streams/kstream/StoresX.java new file mode 100644 index 0000000..82b370c --- /dev/null +++ b/kafka-streams-utils/src/main/java/com/bakdata/kafka/streams/kstream/StoresX.java @@ -0,0 +1,161 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka.streams.kstream; + +import com.bakdata.kafka.Configurator; +import com.bakdata.kafka.Preconfigured; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.SessionBytesStoreSupplier; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; + +/** + * Use {@link Preconfigured} to lazily configure {@link Serde} for {@link Stores} using {@link Configurator} + */ +@RequiredArgsConstructor +public class StoresX { + + private final @NonNull Configurator configurator; + + /** + * @see Stores#sessionStoreBuilder(SessionBytesStoreSupplier, Serde, Serde) + */ + public StoreBuilder> sessionStoreBuilder(final SessionBytesStoreSupplier supplier, + final Preconfigured> keySerde, final Preconfigured> valueSerde) { + return Stores.sessionStoreBuilder(supplier, this.configurator.configureForKeys(keySerde), + this.configurator.configureForValues(valueSerde)); + } + + /** + * @see Stores#sessionStoreBuilder(SessionBytesStoreSupplier, Serde, Serde) + */ + public StoreBuilder> sessionStoreBuilder(final SessionBytesStoreSupplier supplier, + final Serde keySerde, final Serde valueSerde) { + return this.sessionStoreBuilder(supplier, Preconfigured.create(keySerde), Preconfigured.create(valueSerde)); + } + + /** + * @see Stores#timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde) + */ + public StoreBuilder> timestampedWindowStoreBuilder( + final WindowBytesStoreSupplier supplier, final Preconfigured> keySerde, + final Preconfigured> valueSerde) { + return Stores.timestampedWindowStoreBuilder(supplier, this.configurator.configureForKeys(keySerde), + this.configurator.configureForValues(valueSerde)); + } + + /** + * @see Stores#timestampedWindowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde) + */ + public StoreBuilder> timestampedWindowStoreBuilder( + final WindowBytesStoreSupplier supplier, final Serde keySerde, final Serde valueSerde) { + return this.timestampedWindowStoreBuilder(supplier, Preconfigured.create(keySerde), + Preconfigured.create(valueSerde)); + } + + /** + * @see Stores#windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde) + */ + public StoreBuilder> windowStoreBuilder(final WindowBytesStoreSupplier supplier, + final Preconfigured> keySerde, final Preconfigured> valueSerde) { + return Stores.windowStoreBuilder(supplier, this.configurator.configureForKeys(keySerde), + this.configurator.configureForValues(valueSerde)); + } + + /** + * @see Stores#windowStoreBuilder(WindowBytesStoreSupplier, Serde, Serde) + */ + public StoreBuilder> windowStoreBuilder(final WindowBytesStoreSupplier supplier, + final Serde keySerde, final Serde valueSerde) { + return this.windowStoreBuilder(supplier, Preconfigured.create(keySerde), Preconfigured.create(valueSerde)); + } + + /** + * @see Stores#versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde) + */ + public StoreBuilder> versionedKeyValueStoreBuilder( + final VersionedBytesStoreSupplier supplier, final Preconfigured> keySerde, + final Preconfigured> valueSerde) { + return Stores.versionedKeyValueStoreBuilder(supplier, this.configurator.configureForKeys(keySerde), + this.configurator.configureForValues(valueSerde)); + } + + /** + * @see Stores#versionedKeyValueStoreBuilder(VersionedBytesStoreSupplier, Serde, Serde) + */ + public StoreBuilder> versionedKeyValueStoreBuilder( + final VersionedBytesStoreSupplier supplier, final Serde keySerde, final Serde valueSerde) { + return this.versionedKeyValueStoreBuilder(supplier, Preconfigured.create(keySerde), + Preconfigured.create(valueSerde)); + } + + /** + * @see Stores#timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde) + */ + public StoreBuilder> timestampedKeyValueStoreBuilder( + final KeyValueBytesStoreSupplier supplier, final Preconfigured> keySerde, + final Preconfigured> valueSerde) { + return Stores.timestampedKeyValueStoreBuilder(supplier, this.configurator.configureForKeys(keySerde), + this.configurator.configureForValues(valueSerde)); + } + + /** + * @see Stores#timestampedKeyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde) + */ + public StoreBuilder> timestampedKeyValueStoreBuilder( + final KeyValueBytesStoreSupplier supplier, final Serde keySerde, + final Serde valueSerde) { + return this.timestampedKeyValueStoreBuilder(supplier, Preconfigured.create(keySerde), + Preconfigured.create(valueSerde)); + } + + /** + * @see Stores#keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde) + */ + public StoreBuilder> keyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier, + final Preconfigured> keySerde, final Preconfigured> valueSerde) { + return Stores.keyValueStoreBuilder(supplier, this.configurator.configureForKeys(keySerde), + this.configurator.configureForValues(valueSerde)); + } + + /** + * @see Stores#keyValueStoreBuilder(KeyValueBytesStoreSupplier, Serde, Serde) + */ + public StoreBuilder> keyValueStoreBuilder(final KeyValueBytesStoreSupplier supplier, + final Serde keySerde, final Serde valueSerde) { + return this.keyValueStoreBuilder(supplier, Preconfigured.create(keySerde), Preconfigured.create(valueSerde)); + } +} diff --git a/kafka-streams-utils/src/test/java/com/bakdata/kafka/streams/kstream/SimpleProcessor.java b/kafka-streams-utils/src/test/java/com/bakdata/kafka/streams/kstream/SimpleProcessor.java new file mode 100644 index 0000000..877ecc8 --- /dev/null +++ b/kafka-streams-utils/src/test/java/com/bakdata/kafka/streams/kstream/SimpleProcessor.java @@ -0,0 +1,48 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka.streams.kstream; + +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; + +abstract class SimpleProcessor implements Processor { + private ProcessorContext context = null; + + @Override + public void init(final ProcessorContext context) { + this.context = context; + } + + protected void forward(final Record outputRecord) { + this.context.forward(outputRecord); + } + + protected S getStateStore(final String name) { + return this.context.getStateStore(name); + } + +} diff --git a/kafka-streams-utils/src/test/java/com/bakdata/kafka/streams/kstream/StoresXTest.java b/kafka-streams-utils/src/test/java/com/bakdata/kafka/streams/kstream/StoresXTest.java new file mode 100644 index 0000000..d0008dc --- /dev/null +++ b/kafka-streams-utils/src/test/java/com/bakdata/kafka/streams/kstream/StoresXTest.java @@ -0,0 +1,281 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka.streams.kstream; + +import com.bakdata.kafka.Configurator; +import java.time.Duration; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; +import org.apache.kafka.streams.state.WindowStore; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SoftAssertionsExtension.class) +class StoresXTest { + + @InjectSoftAssertions + private SoftAssertions softly; + + private static StoresX createStores() { + return new StoresX(new Configurator(Map.of())); + } + + @Test + void shouldCreateKeyValueStore() { + final StreamsBuilder builder = new StreamsBuilder(); + final KStream input = + builder.stream("input", Consumed.with(Serdes.String(), Serdes.String())); + input.process(new ProcessorSupplier<>() { + @Override + public Processor get() { + return new SimpleProcessor<>() { + @Override + public void process(final Record inputRecord) { + final KeyValueStore store = this.getStateStore("my-store"); + store.put(inputRecord.key(), inputRecord.value()); + } + }; + } + + @Override + public Set> stores() { + final StoreBuilder> store = createStores() + .keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), Serdes.String(), + Serdes.String()); + return Set.of(store); + } + }); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build())) { + driver.createInputTopic("input", new StringSerializer(), new StringSerializer()) + .pipeInput("foo", "bar", 0); + final KeyValueStore store = driver.getKeyValueStore("my-store"); + this.softly.assertThat(store.get("foo")).isEqualTo("bar"); + } + } + + @Test + void shouldCreateSessionStore() { + final StreamsBuilder builder = new StreamsBuilder(); + final KTable, String> input = + builder.stream("input", Consumed.with(Serdes.String(), Serdes.String())) + .groupByKey() + .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(60L))) + .reduce((v1, v2) -> v1 + v2); + input.toStream().process(new ProcessorSupplier<>() { + @Override + public Processor, String, String, String> get() { + return new SimpleProcessor<>() { + @Override + public void process(final Record, String> inputRecord) { + final SessionStore store = this.getStateStore("my-store"); + store.put(inputRecord.key(), inputRecord.value()); + } + }; + } + + @Override + public Set> stores() { + final StoreBuilder> store = createStores() + .sessionStoreBuilder(Stores.inMemorySessionStore("my-store", Duration.ofSeconds(60L)), + Serdes.String(), + Serdes.String()); + return Set.of(store); + } + }); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build())) { + driver.createInputTopic("input", new StringSerializer(), new StringSerializer()) + .pipeInput("foo", "bar", 0); + final SessionStore store = driver.getSessionStore("my-store"); + this.softly.assertThat(store.fetchSession("foo", 0, 60000)).isEqualTo("bar"); + } + } + + @Test + void shouldCreateTimestampedWindowStore() { + final StreamsBuilder builder = new StreamsBuilder(); + final KStream input = + builder.stream("input", Consumed.with(Serdes.String(), Serdes.String())); + input.process(new ProcessorSupplier<>() { + @Override + public Processor get() { + return new SimpleProcessor<>() { + @Override + public void process(final Record inputRecord) { + final TimestampedWindowStore store = this.getStateStore("my-store"); + store.put(inputRecord.key(), + ValueAndTimestamp.make(inputRecord.value(), inputRecord.timestamp()), + inputRecord.timestamp()); + } + }; + } + + @Override + public Set> stores() { + final StoreBuilder> store = createStores() + .timestampedWindowStoreBuilder( + Stores.inMemoryWindowStore("my-store", Duration.ofSeconds(60L), + Duration.ofSeconds(60L), false), + Serdes.String(), + Serdes.String()); + return Set.of(store); + } + }); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build())) { + driver.createInputTopic("input", new StringSerializer(), new StringSerializer()) + .pipeInput("foo", "bar", 0); + final WindowStore> store = driver.getTimestampedWindowStore("my-store"); + this.softly.assertThat(store.fetch("foo", 0L)).isEqualTo(ValueAndTimestamp.make("bar", 0L)); + } + } + + @Test + void shouldCreateWindowStore() { + final StreamsBuilder builder = new StreamsBuilder(); + final KStream input = + builder.stream("input", Consumed.with(Serdes.String(), Serdes.String())); + input.process(new ProcessorSupplier<>() { + @Override + public Processor get() { + return new SimpleProcessor<>() { + @Override + public void process(final Record inputRecord) { + final WindowStore store = this.getStateStore("my-store"); + store.put(inputRecord.key(), inputRecord.value(), inputRecord.timestamp()); + } + }; + } + + @Override + public Set> stores() { + final StoreBuilder> store = createStores() + .windowStoreBuilder(Stores.inMemoryWindowStore("my-store", Duration.ofSeconds(60L), + Duration.ofSeconds(60L), false), Serdes.String(), + Serdes.String()); + return Set.of(store); + } + }); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build())) { + driver.createInputTopic("input", new StringSerializer(), new StringSerializer()) + .pipeInput("foo", "bar", 0); + final WindowStore store = driver.getWindowStore("my-store"); + this.softly.assertThat(store.fetch("foo", 0L)).isEqualTo("bar"); + } + } + + @Test + void shouldCreateVersionedKeyValueStore() { + final StreamsBuilder builder = new StreamsBuilder(); + final KStream input = + builder.stream("input", Consumed.with(Serdes.String(), Serdes.String())); + input.process(new ProcessorSupplier<>() { + @Override + public Processor get() { + return new SimpleProcessor<>() { + @Override + public void process(final Record inputRecord) { + final VersionedKeyValueStore store = this.getStateStore("my-store"); + store.put(inputRecord.key(), inputRecord.value(), inputRecord.timestamp()); + } + }; + } + + @Override + public Set> stores() { + final StoreBuilder> store = createStores() + .versionedKeyValueStoreBuilder( + Stores.persistentVersionedKeyValueStore("my-store", Duration.ofSeconds(60L)), + Serdes.String(), + Serdes.String()); + return Set.of(store); + } + }); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build())) { + driver.createInputTopic("input", new StringSerializer(), new StringSerializer()) + .pipeInput("foo", "bar", 0); + final VersionedKeyValueStore store = driver.getVersionedKeyValueStore("my-store"); + this.softly.assertThat(store.get("foo", 0L)).isEqualTo(new VersionedRecord<>("bar", 0L)); + } + } + + @Test + void shouldCreateTimestampedKeyValueStore() { + final StreamsBuilder builder = new StreamsBuilder(); + final KStream input = + builder.stream("input", Consumed.with(Serdes.String(), Serdes.String())); + input.process(new ProcessorSupplier<>() { + @Override + public Processor get() { + return new SimpleProcessor<>() { + @Override + public void process(final Record inputRecord) { + final TimestampedKeyValueStore store = this.getStateStore("my-store"); + store.put(inputRecord.key(), + ValueAndTimestamp.make(inputRecord.value(), inputRecord.timestamp())); + } + }; + } + + @Override + public Set> stores() { + final StoreBuilder> store = createStores() + .timestampedKeyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), + Serdes.String(), + Serdes.String()); + return Set.of(store); + } + }); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build())) { + driver.createInputTopic("input", new StringSerializer(), new StringSerializer()) + .pipeInput("foo", "bar", 0); + final KeyValueStore> store = + driver.getTimestampedKeyValueStore("my-store"); + this.softly.assertThat(store.get("foo")).isEqualTo(ValueAndTimestamp.make("bar", 0L)); + } + } +}