, ...}.
- * Attaching a state store makes this a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
- * If you choose not to attach one, this operation is similar to the stateless {@link #mapValues(ValueMapper) mapValues()}
- * but allows access to the {@code ProcessorContext} and record metadata.
- * This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
- * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
- * the processing progress can be observed and additional periodic actions can be performed.
- *
- * In order for the transformer to use state stores, the stores must be added to the topology and connected to the
- * transformer using at least one of two strategies (though it's not required to connect global state stores; read-only
- * access to global state stores is available by default).
- *
- * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
- * and specify the store names via {@code stateStoreNames} so they will be connected to the transformer.
- *
{@code
- * // create store
- * StoreBuilder> keyValueStoreBuilder =
- * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * // add store
- * builder.addStateStore(keyValueStoreBuilder);
- *
- * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() {
- * public ValueTransformer get() {
- * return new MyValueTransformer();
- * }
- * }, "myValueTransformState");
- * }
- * The second strategy is for the given {@link ValueTransformerSupplier} to implement {@link ConnectedStoreProvider#stores()},
- * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the transformer.
- * {@code
- * class MyValueTransformerSupplier implements ValueTransformerSupplier {
- * // supply transformer
- * ValueTransformerWithKey get() {
- * return new MyValueTransformerWithKey();
- * }
- *
- * // provide store(s) that will be added and connected to the associated transformer
- * // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
- * Set stores() {
- * StoreBuilder> keyValueStoreBuilder =
- * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * return Collections.singleton(keyValueStoreBuilder);
- * }
- * }
- *
- * ...
- *
- * KStream outputStream = inputStream.flatTransformValues(new MyValueTransformer());
- * }
- *
- * With either strategy, within the {@link ValueTransformer}, the state is obtained via the {@link ProcessorContext}.
- * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
- * a schedule must be registered.
- * The {@link ValueTransformer} must return an {@link java.lang.Iterable} type (e.g., any
- * {@link java.util.Collection} type) in {@link ValueTransformer#transform(Object)
- * transform()}.
- * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is an empty
- * {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
- * No additional {@link KeyValue} pairs can be emitted via
- * {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
- * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
- * emit a {@link KeyValue} pair.
- *
{@code
- * class MyValueTransformer implements ValueTransformer {
- * private StateStore state;
- *
- * void init(ProcessorContext context) {
- * this.state = context.getStateStore("myValueTransformState");
- * // punctuate each second, can access this.state
- * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
- * }
- *
- * Iterable transform(V value) {
- * // can access this.state
- * List result = new ArrayList<>();
- * for (int i = 0; i < 3; i++) {
- * result.add(new NewValueType(value));
- * }
- * return result; // values
- * }
- *
- * void close() {
- * // can access this.state
- * }
- * }
- * }
- * Even if any upstream operation was key-changing, no auto-repartition is triggered.
- * If repartitioning is required, a call to {@link #repartition()} should be performed before
- * {@code flatTransformValues()}.
- *
- * Setting a new value preserves data co-location with respect to the key.
- * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
- * is applied to the result {@code KStream}.
- *
- * @param valueTransformerSupplier an instance of {@link ValueTransformerSupplier} that generates a newly constructed {@link ValueTransformer}
- * The supplier should always generate a new instance. Creating a single {@link ValueTransformer} object
- * and returning the same object reference in {@link ValueTransformer} is a
- * violation of the supplier pattern and leads to runtime exceptions.
- * @param stateStoreNames the names of the state stores used by the processor; not required if the supplier
- * implements {@link ConnectedStoreProvider#stores()}
- * @param the value type of the result stream
- * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
- * different type)
- * @see #mapValues(ValueMapper)
- * @see #mapValues(ValueMapperWithKey)
- * @deprecated Since 3.3. Use {@link KStream#processValues(FixedKeyProcessorSupplier, String...)} instead.
- */
- @Deprecated
- KStream flatTransformValues(final ValueTransformerSupplier super V, Iterable> valueTransformerSupplier,
- final String... stateStoreNames);
-
- /**
- * Transform the value of each input record into zero or more new values (with possibly a new
- * type) and emit for each new value a record with the same key of the input record and the value.
- * A {@link ValueTransformer} (provided by the given {@link ValueTransformerSupplier}) is applied to each input
- * record value and computes zero or more new values.
- * Thus, an input record {@code } can be transformed into output records {@code , , ...}.
- * Attaching a state store makes this a stateful record-by-record operation (cf. {@link #mapValues(ValueMapper) mapValues()}).
- * If you choose not to attach one, this operation is similar to the stateless {@link #mapValues(ValueMapper) mapValues()}
- * but allows access to the {@code ProcessorContext} and record metadata.
- * This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
- * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) Punctuator#punctuate()}
- * the processing progress can be observed and additional periodic actions can be performed.
- *
- * In order for the transformer to use state stores, the stores must be added to the topology and connected to the
- * transformer using at least one of two strategies (though it's not required to connect global state stores; read-only
- * access to global state stores is available by default).
- *
- * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
- * and specify the store names via {@code stateStoreNames} so they will be connected to the transformer.
- *
{@code
- * // create store
- * StoreBuilder> keyValueStoreBuilder =
- * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * // add store
- * builder.addStateStore(keyValueStoreBuilder);
- *
- * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerSupplier() {
- * public ValueTransformer get() {
- * return new MyValueTransformer();
- * }
- * }, "myValueTransformState");
- * }
- * The second strategy is for the given {@link ValueTransformerSupplier} to implement {@link ConnectedStoreProvider#stores()},
- * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the transformer.
- * {@code
- * class MyValueTransformerSupplier implements ValueTransformerSupplier {
- * // supply transformer
- * ValueTransformerWithKey get() {
- * return new MyValueTransformerWithKey();
- * }
- *
- * // provide store(s) that will be added and connected to the associated transformer
- * // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
- * Set stores() {
- * StoreBuilder> keyValueStoreBuilder =
- * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * return Collections.singleton(keyValueStoreBuilder);
- * }
- * }
- *
- * ...
- *
- * KStream outputStream = inputStream.flatTransformValues(new MyValueTransformer());
- * }
- *
- * With either strategy, within the {@link ValueTransformer}, the state is obtained via the {@link ProcessorContext}.
- * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
- * a schedule must be registered.
- * The {@link ValueTransformer} must return an {@link java.lang.Iterable} type (e.g., any
- * {@link java.util.Collection} type) in {@link ValueTransformer#transform(Object)
- * transform()}.
- * If the return value of {@link ValueTransformer#transform(Object) ValueTransformer#transform()} is an empty
- * {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
- * No additional {@link KeyValue} pairs can be emitted via
- * {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
- * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformer} tries to
- * emit a {@link KeyValue} pair.
- *
{@code
- * class MyValueTransformer implements ValueTransformer {
- * private StateStore state;
- *
- * void init(ProcessorContext context) {
- * this.state = context.getStateStore("myValueTransformState");
- * // punctuate each second, can access this.state
- * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
- * }
- *
- * Iterable transform(V value) {
- * // can access this.state
- * List result = new ArrayList<>();
- * for (int i = 0; i < 3; i++) {
- * result.add(new NewValueType(value));
- * }
- * return result; // values
- * }
- *
- * void close() {
- * // can access this.state
- * }
- * }
- * }
- * Even if any upstream operation was key-changing, no auto-repartition is triggered.
- * If repartitioning is required, a call to {@link #repartition()} should be performed before
- * {@code flatTransformValues()}.
- *
- * Setting a new value preserves data co-location with respect to the key.
- * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
- * is applied to the result {@code KStream}.
- *
- * @param valueTransformerSupplier an instance of {@link ValueTransformerSupplier} that generates a newly constructed {@link ValueTransformer}
- * The supplier should always generate a new instance. Creating a single {@link ValueTransformer} object
- * and returning the same object reference in {@link ValueTransformer} is a
- * violation of the supplier pattern and leads to runtime exceptions.
- * @param named a {@link Named} config used to name the processor in the topology
- * @param stateStoreNames the names of the state stores used by the processor; not required if the supplier
- * implements {@link ConnectedStoreProvider#stores()}
- * @param the value type of the result stream
- * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
- * different type)
- * @see #mapValues(ValueMapper)
- * @see #mapValues(ValueMapperWithKey)
- * @deprecated Since 3.3. Use {@link KStream#processValues(FixedKeyProcessorSupplier, Named, String...)} instead.
- */
- @Deprecated
- KStream flatTransformValues(final ValueTransformerSupplier super V, Iterable> valueTransformerSupplier,
- final Named named,
- final String... stateStoreNames);
-
- /**
- * Transform the value of each input record into zero or more new values (with possibly a new
- * type) and emit for each new value a record with the same key of the input record and the value.
- * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
- * each input record value and computes zero or more new values.
- * Thus, an input record {@code } can be transformed into output records {@code , , ...}.
- * Attaching a state store makes this a stateful record-by-record operation (cf. {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}).
- * If you choose not to attach one, this operation is similar to the stateless {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}
- * but allows access to the {@code ProcessorContext} and record metadata.
- * This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
- * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can
- * be observed and additional periodic actions can be performed.
- *
- * In order for the transformer to use state stores, the stores must be added to the topology and connected to the
- * transformer using at least one of two strategies (though it's not required to connect global state stores; read-only
- * access to global state stores is available by default).
- *
- * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
- * and specify the store names via {@code stateStoreNames} so they will be connected to the transformer.
- *
{@code
- * // create store
- * StoreBuilder> keyValueStoreBuilder =
- * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * // add store
- * builder.addStateStore(keyValueStoreBuilder);
- *
- * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() {
- * public ValueTransformerWithKey get() {
- * return new MyValueTransformerWithKey();
- * }
- * }, "myValueTransformState");
- * }
- * The second strategy is for the given {@link ValueTransformerSupplier} to implement {@link ConnectedStoreProvider#stores()},
- * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the transformer.
- * {@code
- * class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier {
- * // supply transformer
- * ValueTransformerWithKey get() {
- * return new MyValueTransformerWithKey();
- * }
- *
- * // provide store(s) that will be added and connected to the associated transformer
- * // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
- * Set stores() {
- * StoreBuilder> keyValueStoreBuilder =
- * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * return Collections.singleton(keyValueStoreBuilder);
- * }
- * }
- *
- * ...
- *
- * KStream outputStream = inputStream.flatTransformValues(new MyValueTransformerWithKey());
- * }
- *
- * With either strategy, within the {@link ValueTransformerWithKey}, the state is obtained via the {@link ProcessorContext}.
- * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
- * a schedule must be registered.
- * The {@link ValueTransformerWithKey} must return an {@link java.lang.Iterable} type (e.g., any
- * {@link java.util.Collection} type) in {@link ValueTransformerWithKey#transform(Object, Object)
- * transform()}.
- * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
- * is an empty {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
- * No additional {@link KeyValue} pairs can be emitted via
- * {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
- * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
- * to emit a {@link KeyValue} pair.
- *
{@code
- * class MyValueTransformerWithKey implements ValueTransformerWithKey {
- * private StateStore state;
- *
- * void init(ProcessorContext context) {
- * this.state = context.getStateStore("myValueTransformState");
- * // punctuate each second, can access this.state
- * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
- * }
- *
- * Iterable transform(K readOnlyKey, V value) {
- * // can access this.state and use read-only key
- * List result = new ArrayList<>();
- * for (int i = 0; i < 3; i++) {
- * result.add(new NewValueType(readOnlyKey));
- * }
- * return result; // values
- * }
- *
- * void close() {
- * // can access this.state
- * }
- * }
- * }
- * Even if any upstream operation was key-changing, no auto-repartition is triggered.
- * If repartitioning is required, a call to {@link #repartition()} should be performed before
- * {@code flatTransformValues()}.
- *
- * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
- * So, setting a new value preserves data co-location with respect to the key.
- * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
- * is applied to the result {@code KStream}.
- *
- * @param valueTransformerSupplier an instance of {@link ValueTransformerWithKeySupplier} that generates a newly constructed {@link ValueTransformerWithKey}
- * The supplier should always generate a new instance. Creating a single {@link ValueTransformerWithKey} object
- * and returning the same object reference in {@link ValueTransformerWithKey} is a
- * violation of the supplier pattern and leads to runtime exceptions.
- * @param stateStoreNames the names of the state stores used by the processor; not required if the supplier
- * implements {@link ConnectedStoreProvider#stores()}
- * @param the value type of the result stream
- * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
- * different type)
- * @see #mapValues(ValueMapper)
- * @see #mapValues(ValueMapperWithKey)
- * @deprecated Since 3.3. Use {@link KStream#processValues(FixedKeyProcessorSupplier, String...)} instead.
- */
- @Deprecated
- KStream flatTransformValues(final ValueTransformerWithKeySupplier super K, ? super V, Iterable> valueTransformerSupplier,
- final String... stateStoreNames);
-
- /**
- * Transform the value of each input record into zero or more new values (with possibly a new
- * type) and emit for each new value a record with the same key of the input record and the value.
- * A {@link ValueTransformerWithKey} (provided by the given {@link ValueTransformerWithKeySupplier}) is applied to
- * each input record value and computes zero or more new values.
- * Thus, an input record {@code } can be transformed into output records {@code , , ...}.
- * Attaching a state store makes this a stateful record-by-record operation (cf. {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}).
- * If you choose not to attach one, this operation is similar to the stateless {@link #flatMapValues(ValueMapperWithKey) flatMapValues()}
- * but allows access to the {@code ProcessorContext} and record metadata.
- * This is essentially mixing the Processor API into the DSL, and provides all the functionality of the PAPI.
- * Furthermore, via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing progress can
- * be observed and additional periodic actions can be performed.
- *
- * In order for the transformer to use state stores, the stores must be added to the topology and connected to the
- * transformer using at least one of two strategies (though it's not required to connect global state stores; read-only
- * access to global state stores is available by default).
- *
- * The first strategy is to manually add the {@link StoreBuilder}s via {@link Topology#addStateStore(StoreBuilder, String...)},
- * and specify the store names via {@code stateStoreNames} so they will be connected to the transformer.
- *
{@code
- * // create store
- * StoreBuilder> keyValueStoreBuilder =
- * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * // add store
- * builder.addStateStore(keyValueStoreBuilder);
- *
- * KStream outputStream = inputStream.flatTransformValues(new ValueTransformerWithKeySupplier() {
- * public ValueTransformerWithKey get() {
- * return new MyValueTransformerWithKey();
- * }
- * }, "myValueTransformState");
- * }
- * The second strategy is for the given {@link ValueTransformerSupplier} to implement {@link ConnectedStoreProvider#stores()},
- * which provides the {@link StoreBuilder}s to be automatically added to the topology and connected to the transformer.
- * {@code
- * class MyValueTransformerWithKeySupplier implements ValueTransformerWithKeySupplier {
- * // supply transformer
- * ValueTransformerWithKey get() {
- * return new MyValueTransformerWithKey();
- * }
- *
- * // provide store(s) that will be added and connected to the associated transformer
- * // the store name from the builder ("myValueTransformState") is used to access the store later via the ProcessorContext
- * Set stores() {
- * StoreBuilder> keyValueStoreBuilder =
- * Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
- * Serdes.String(),
- * Serdes.String());
- * return Collections.singleton(keyValueStoreBuilder);
- * }
- * }
- *
- * ...
- *
- * KStream outputStream = inputStream.flatTransformValues(new MyValueTransformerWithKey());
- * }
- *
- * With either strategy, within the {@link ValueTransformerWithKey}, the state is obtained via the {@link ProcessorContext}.
- * To trigger periodic actions via {@link org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
- * a schedule must be registered.
- * The {@link ValueTransformerWithKey} must return an {@link java.lang.Iterable} type (e.g., any
- * {@link java.util.Collection} type) in {@link ValueTransformerWithKey#transform(Object, Object)
- * transform()}.
- * If the return value of {@link ValueTransformerWithKey#transform(Object, Object) ValueTransformerWithKey#transform()}
- * is an empty {@link java.lang.Iterable Iterable} or {@code null}, no records are emitted.
- * No additional {@link KeyValue} pairs can be emitted via
- * {@link org.apache.kafka.streams.processor.ProcessorContext#forward(Object, Object) ProcessorContext.forward()}.
- * A {@link org.apache.kafka.streams.errors.StreamsException} is thrown if the {@link ValueTransformerWithKey} tries
- * to emit a {@link KeyValue} pair.
- *
{@code
- * class MyValueTransformerWithKey implements ValueTransformerWithKey {
- * private StateStore state;
- *
- * void init(ProcessorContext context) {
- * this.state = context.getStateStore("myValueTransformState");
- * // punctuate each second, can access this.state
- * context.schedule(Duration.ofSeconds(1), PunctuationType.WALL_CLOCK_TIME, new Punctuator(..));
- * }
- *
- * Iterable transform(K readOnlyKey, V value) {
- * // can access this.state and use read-only key
- * List result = new ArrayList<>();
- * for (int i = 0; i < 3; i++) {
- * result.add(new NewValueType(readOnlyKey));
- * }
- * return result; // values
- * }
- *
- * void close() {
- * // can access this.state
- * }
- * }
- * }
- * Even if any upstream operation was key-changing, no auto-repartition is triggered.
- * If repartitioning is required, a call to {@link #repartition()} should be performed before
- * {@code flatTransformValues()}.
- *
- * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning.
- * So, setting a new value preserves data co-location with respect to the key.
- * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join)
- * is applied to the result {@code KStream}.
- *
- * @param valueTransformerSupplier an instance of {@link ValueTransformerWithKeySupplier} that generates a newly constructed {@link ValueTransformerWithKey}
- * The supplier should always generate a new instance. Creating a single {@link ValueTransformerWithKey} object
- * and returning the same object reference in {@link ValueTransformerWithKey} is a
- * violation of the supplier pattern and leads to runtime exceptions.
- * @param named a {@link Named} config used to name the processor in the topology
- * @param stateStoreNames the names of the state stores used by the processor; not required if the supplier
- * implements {@link ConnectedStoreProvider#stores()}
- * @param the value type of the result stream
- * @return a {@code KStream} that contains more or less records with unmodified key and new values (possibly of
- * different type)
- * @see #mapValues(ValueMapper)
- * @see #mapValues(ValueMapperWithKey)
- * @deprecated Since 3.3. Use {@link KStream#processValues(FixedKeyProcessorSupplier, Named, String...)} instead.
- */
- @Deprecated
- KStream flatTransformValues(final ValueTransformerWithKeySupplier super K, ? super V, Iterable> valueTransformerSupplier,
- final Named named,
- final String... stateStoreNames);
/**
* Process all records in this stream, one record at a time, by applying a
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 2483cbbbe1673..91a93d23a07e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -17,17 +17,12 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
-import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.state.StoreBuilder;
import java.util.Collection;
import java.util.HashSet;
@@ -109,40 +104,6 @@ static ValueMapperWithKey withKey(final ValueMapper
return (readOnlyKey, value) -> valueMapper.apply(value);
}
- @SuppressWarnings("deprecation")
- static ValueTransformerWithKeySupplier toValueTransformerWithKeySupplier(
- final org.apache.kafka.streams.kstream.ValueTransformerSupplier valueTransformerSupplier) {
- Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
- ApiUtils.checkSupplier(valueTransformerSupplier);
- return new ValueTransformerWithKeySupplier() {
- @Override
- public ValueTransformerWithKey get() {
- final org.apache.kafka.streams.kstream.ValueTransformer valueTransformer = valueTransformerSupplier.get();
- return new ValueTransformerWithKey() {
- @Override
- public void init(final ProcessorContext context) {
- valueTransformer.init(context);
- }
-
- @Override
- public VR transform(final K readOnlyKey, final V value) {
- return valueTransformer.transform(value);
- }
-
- @Override
- public void close() {
- valueTransformer.close();
- }
- };
- }
-
- @Override
- public Set> stores() {
- return valueTransformerSupplier.stores();
- }
- };
- }
-
static ValueJoinerWithKey toValueJoinerWithKey(final ValueJoiner valueJoiner) {
Objects.requireNonNull(valueJoiner, "joiner can't be null");
return (readOnlyKey, value1, value2) -> valueJoiner.apply(value1, value2);
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java
deleted file mode 100644
index 5ce059990630a..0000000000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransform.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.api.ContextualProcessor;
-import org.apache.kafka.streams.processor.api.Processor;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.api.ProcessorSupplier;
-import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.state.StoreBuilder;
-
-import java.util.Set;
-
-public class KStreamFlatTransform implements ProcessorSupplier {
-
- @SuppressWarnings("deprecation")
- private final org.apache.kafka.streams.kstream.TransformerSupplier super KIn, ? super VIn, Iterable>> transformerSupplier;
-
- @SuppressWarnings("deprecation")
- public KStreamFlatTransform(final org.apache.kafka.streams.kstream.TransformerSupplier super KIn, ? super VIn, Iterable>> transformerSupplier) {
- this.transformerSupplier = transformerSupplier;
- }
-
- @Override
- public Processor get() {
- return new KStreamFlatTransformProcessor<>(transformerSupplier.get());
- }
-
- @Override
- public Set> stores() {
- return transformerSupplier.stores();
- }
-
- public static class KStreamFlatTransformProcessor extends ContextualProcessor {
-
- @SuppressWarnings("deprecation")
- private final org.apache.kafka.streams.kstream.Transformer super KIn, ? super VIn, Iterable>> transformer;
-
- @SuppressWarnings("deprecation")
- public KStreamFlatTransformProcessor(final org.apache.kafka.streams.kstream.Transformer super KIn, ? super VIn, Iterable>> transformer) {
- this.transformer = transformer;
- }
-
- @Override
- public void init(final ProcessorContext context) {
- super.init(context);
- transformer.init((InternalProcessorContext) context);
- }
-
- @Override
- public void process(final Record record) {
- final Iterable> pairs = transformer.transform(record.key(), record.value());
- if (pairs != null) {
- for (final KeyValue pair : pairs) {
- context().forward(record.withKey(pair.key).withValue(pair.value));
- }
- }
- }
-
- @Override
- public void close() {
- transformer.close();
- }
- }
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
deleted file mode 100644
index 5469c668dfee2..0000000000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValues.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
-import org.apache.kafka.streams.processor.api.ContextualProcessor;
-import org.apache.kafka.streams.processor.api.Processor;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.api.ProcessorSupplier;
-import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-import org.apache.kafka.streams.state.StoreBuilder;
-
-import java.util.Set;
-
-public class KStreamFlatTransformValues implements ProcessorSupplier {
-
- private final ValueTransformerWithKeySupplier> valueTransformerSupplier;
-
- public KStreamFlatTransformValues(final ValueTransformerWithKeySupplier> valueTransformerWithKeySupplier) {
- this.valueTransformerSupplier = valueTransformerWithKeySupplier;
- }
-
- @Override
- public Processor get() {
- return new KStreamFlatTransformValuesProcessor<>(valueTransformerSupplier.get());
- }
-
- @Override
- public Set> stores() {
- return valueTransformerSupplier.stores();
- }
-
- public static class KStreamFlatTransformValuesProcessor extends ContextualProcessor {
-
- private final ValueTransformerWithKey> valueTransformer;
-
- KStreamFlatTransformValuesProcessor(final ValueTransformerWithKey> valueTransformer) {
- this.valueTransformer = valueTransformer;
- }
-
- @Override
- public void init(final ProcessorContext context) {
- super.init(context);
- valueTransformer.init(new ForwardingDisabledProcessorContext((InternalProcessorContext) context));
- }
-
- @Override
- public void process(final Record record) {
- final Iterable transformedValues = valueTransformer.transform(record.key(), record.value());
- if (transformedValues != null) {
- for (final VOut transformedValue : transformedValues) {
- context().forward(record.withValue(transformedValue));
- }
- }
- }
-
- @Override
- public void close() {
- super.close();
- valueTransformer.close();
- }
- }
-
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 820c31f29e43e..ab27cfc1ea19d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -41,7 +41,6 @@
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode.BaseRepartitionNodeBuilder;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
@@ -121,8 +120,6 @@ public class KStreamImpl extends AbstractStream implements KStream KStream doStreamTableJoin(final KTable table,
builder);
}
- @Override
- @Deprecated
- public KStream flatTransformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier super V, Iterable> valueTransformerSupplier,
- final String... stateStoreNames) {
- Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
- return doFlatTransformValues(
- toValueTransformerWithKeySupplier(valueTransformerSupplier),
- NamedInternal.empty(),
- stateStoreNames);
- }
-
- @Override
- @Deprecated
- public KStream flatTransformValues(final org.apache.kafka.streams.kstream.ValueTransformerSupplier super V, Iterable> valueTransformerSupplier,
- final Named named,
- final String... stateStoreNames) {
- Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
- return doFlatTransformValues(
- toValueTransformerWithKeySupplier(valueTransformerSupplier),
- named,
- stateStoreNames);
- }
-
- @Override
- @Deprecated
- public KStream flatTransformValues(final ValueTransformerWithKeySupplier super K, ? super V, Iterable> valueTransformerSupplier,
- final String... stateStoreNames) {
- Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
- return doFlatTransformValues(valueTransformerSupplier, NamedInternal.empty(), stateStoreNames);
- }
-
- @Override
- @Deprecated
- public KStream flatTransformValues(final ValueTransformerWithKeySupplier super K, ? super V, Iterable> valueTransformerSupplier,
- final Named named,
- final String... stateStoreNames) {
- Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
- return doFlatTransformValues(valueTransformerSupplier, named, stateStoreNames);
- }
-
- private KStream doFlatTransformValues(final ValueTransformerWithKeySupplier super K, ? super V, Iterable> valueTransformerWithKeySupplier,
- final Named named,
- final String... stateStoreNames) {
- Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
- for (final String stateStoreName : stateStoreNames) {
- Objects.requireNonNull(stateStoreName, "stateStoreNames can't contain `null` as store name");
- }
- ApiUtils.checkSupplier(valueTransformerWithKeySupplier);
-
- final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, TRANSFORMVALUES_NAME);
- final StatefulProcessorNode super K, ? super V> transformNode = new StatefulProcessorNode<>(
- name,
- new ProcessorParameters<>(new KStreamFlatTransformValues<>(valueTransformerWithKeySupplier), name),
- stateStoreNames);
- transformNode.setValueChangingOperation(true);
-
- builder.addGraphNode(graphNode, transformNode);
-
- // cannot inherit value serde
- return new KStreamImpl<>(
- name,
- keySerde,
- null,
- subTopologySourceNodes,
- repartitionRequired,
- transformNode,
- builder);
- }
-
@Override
@Deprecated
public void process(final org.apache.kafka.streams.processor.ProcessorSupplier super K, ? super V> processorSupplier,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
index 108a7d7233bf3..ad3a834257d1b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ConnectedStoreProvider.java
@@ -19,7 +19,6 @@
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Named;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
@@ -92,11 +91,8 @@
* @see Topology#addProcessor(String, org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
* @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, String...)
* @see KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, Named, String...)
- * @see KStream#processValues(FixedKeyProcessorSupplier, String...)
- * @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, String...)
- * @see KStream#flatTransformValues(org.apache.kafka.streams.kstream.ValueTransformerSupplier, Named, String...)
- * @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, String...)
- * @see KStream#flatTransformValues(ValueTransformerWithKeySupplier, Named, String...)
+ * @see KStream#processValues(FixedKeyProcessorSupplier, String...)
+ * @see KStream#processValues(FixedKeyProcessorSupplier, Named, String...)
*/
public interface ConnectedStoreProvider {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
index 6c973e096fc27..1dffc4ebbd3ff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
@@ -119,7 +119,8 @@ private Producer producer() {
}
public void reInitializeProducer() {
- streamsProducer.resetProducer(producer());
+ if (!streamsProducer.isClosed())
+ streamsProducer.resetProducer(producer());
}
StreamsProducer streamsProducer() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index 1048b5a2ecfdf..546186b2dbd15 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -70,6 +70,7 @@ public class StreamsProducer {
private Producer producer;
private boolean transactionInFlight = false;
private boolean transactionInitialized = false;
+ private boolean closed = false;
private double oldProducerTotalBlockedTime = 0;
// we have a single `StreamsProducer` per thread, and thus a single `sendException` instance,
// which we share across all tasks, ie, all `RecordCollectorImpl`
@@ -98,6 +99,10 @@ boolean transactionInFlight() {
return transactionInFlight;
}
+ boolean isClosed() {
+ return closed;
+ }
+
/**
* @throws IllegalStateException if EOS is disabled
*/
@@ -320,6 +325,7 @@ void flush() {
void close() {
producer.close();
+ closed = true;
transactionInFlight = false;
transactionInitialized = false;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 0c9031afbdfc6..056721fa8af1d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -61,12 +61,11 @@
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper;
import org.apache.kafka.streams.utils.TestUtils.RecordingProcessorWrapper.WrapperRecorder;
+import org.apache.kafka.test.MockApiFixedKeyProcessorSupplier;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockPredicate;
import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.NoopValueTransformer;
-import org.apache.kafka.test.NoopValueTransformerWithKey;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
@@ -1315,29 +1314,21 @@ public void shouldUseSpecifiedNameForProcessOperation() {
}
@Test
- public void shouldUseSpecifiedNameForPrintOperation() {
- builder.stream(STREAM_TOPIC).print(Printed.toSysOut().withName("print-processor"));
- builder.build();
- final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
- assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "print-processor");
- }
+ public void shouldUseSpecifiedNameForProcessValuesOperation() {
+ builder.stream(STREAM_TOPIC)
+ .processValues(new MockApiFixedKeyProcessorSupplier<>(), Named.as("test-fixed-key-processor"));
- @Test
- @SuppressWarnings("deprecation")
- public void shouldUseSpecifiedNameForFlatTransformValueOperation() {
- builder.stream(STREAM_TOPIC).flatTransformValues(() -> new NoopValueTransformer<>(), Named.as(STREAM_OPERATION_NAME));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
- assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+ assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "test-fixed-key-processor");
}
@Test
- @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
- public void shouldUseSpecifiedNameForFlatTransformValueWithKeyOperation() {
- builder.stream(STREAM_TOPIC).flatTransformValues(() -> new NoopValueTransformerWithKey(), Named.as(STREAM_OPERATION_NAME));
+ public void shouldUseSpecifiedNameForPrintOperation() {
+ builder.stream(STREAM_TOPIC).print(Printed.toSysOut().withName("print-processor"));
builder.build();
final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).buildTopology();
- assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", STREAM_OPERATION_NAME);
+ assertNamesForOperation(topology, "KSTREAM-SOURCE-0000000000", "print-processor");
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index ca4fd756cbc1b..01e833f1b976b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -32,7 +32,6 @@
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.test.MockApiProcessorSupplier;
-import org.apache.kafka.test.NoopValueTransformer;
import org.apache.kafka.test.NoopValueTransformerWithKey;
import org.junit.jupiter.api.Test;
@@ -51,21 +50,6 @@
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class AbstractStreamTest {
- @SuppressWarnings("deprecation")
- @Test
- public void testToInternalValueTransformerSupplierSuppliesNewTransformers() {
- final org.apache.kafka.streams.kstream.ValueTransformerSupplier, ?> valueTransformerSupplier =
- mock(org.apache.kafka.streams.kstream.ValueTransformerSupplier.class);
- when(valueTransformerSupplier.get())
- .thenReturn(new NoopValueTransformer<>())
- .thenReturn(new NoopValueTransformer<>());
- final ValueTransformerWithKeySupplier, ?, ?> valueTransformerWithKeySupplier =
- AbstractStream.toValueTransformerWithKeySupplier(valueTransformerSupplier);
- valueTransformerWithKeySupplier.get();
- valueTransformerWithKeySupplier.get();
- valueTransformerWithKeySupplier.get();
- }
-
@Test
public void testToInternalValueTransformerWithKeySupplierSuppliesNewTransformers() {
final ValueTransformerWithKeySupplier, ?, ?> valueTransformerWithKeySupplier =
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
deleted file mode 100644
index 5335128aa459d..0000000000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformTest.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.internals.KStreamFlatTransform.KStreamFlatTransformProcessor;
-import org.apache.kafka.streams.processor.api.Processor;
-import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentMatchers;
-import org.mockito.InOrder;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
-import org.mockito.quality.Strictness;
-
-import java.util.Arrays;
-import java.util.Collections;
-
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = Strictness.STRICT_STUBS)
-@SuppressWarnings("deprecation")
-public class KStreamFlatTransformTest {
-
- private Number inputKey;
- private Number inputValue;
-
- @Mock
- private org.apache.kafka.streams.kstream.Transformer>> transformer;
- @Mock
- private InternalProcessorContext context;
- private InOrder inOrder;
-
- private KStreamFlatTransformProcessor processor;
-
- @BeforeEach
- public void setUp() {
- inputKey = 1;
- inputValue = 10;
- inOrder = inOrder(context);
- processor = new KStreamFlatTransformProcessor<>(transformer);
- }
-
- @Test
- public void shouldInitialiseFlatTransformProcessor() {
- processor.init(context);
-
- verify(transformer).init(context);
- }
-
- @Test
- public void shouldTransformInputRecordToMultipleOutputRecords() {
- final Iterable> outputRecords = Arrays.asList(
- KeyValue.pair(2, 20),
- KeyValue.pair(3, 30),
- KeyValue.pair(4, 40));
-
- processor.init(context);
-
- when(transformer.transform(inputKey, inputValue)).thenReturn(outputRecords);
-
- processor.process(new Record<>(inputKey, inputValue, 0L));
-
- for (final KeyValue outputRecord : outputRecords) {
- inOrder.verify(context).forward(new Record<>(outputRecord.key, outputRecord.value, 0L));
- }
- }
-
- @Test
- public void shouldAllowEmptyListAsResultOfTransform() {
- processor.init(context);
-
- when(transformer.transform(inputKey, inputValue)).thenReturn(Collections.emptyList());
-
- processor.process(new Record<>(inputKey, inputValue, 0L));
-
- inOrder.verify(context, never()).forward(ArgumentMatchers.>any());
- }
-
- @Test
- public void shouldAllowNullAsResultOfTransform() {
- processor.init(context);
-
- when(transformer.transform(inputKey, inputValue)).thenReturn(null);
-
- processor.process(new Record<>(inputKey, inputValue, 0L));
-
- inOrder.verify(context, never()).forward(ArgumentMatchers.>any());
- }
-
- @Test
- public void shouldCloseFlatTransformProcessor() {
- processor.close();
-
- verify(transformer).close();
- }
-
- @Test
- public void shouldGetFlatTransformProcessor() {
- @SuppressWarnings("unchecked")
- final org.apache.kafka.streams.kstream.TransformerSupplier>> transformerSupplier =
- mock(org.apache.kafka.streams.kstream.TransformerSupplier.class);
- final KStreamFlatTransform processorSupplier =
- new KStreamFlatTransform<>(transformerSupplier);
-
- when(transformerSupplier.get()).thenReturn(transformer);
-
- final Processor processor = processorSupplier.get();
-
- assertInstanceOf(KStreamFlatTransformProcessor.class, processor);
- }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
deleted file mode 100644
index 50a636f349db3..0000000000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatTransformValuesTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
-import org.apache.kafka.streams.kstream.internals.KStreamFlatTransformValues.KStreamFlatTransformValuesProcessor;
-import org.apache.kafka.streams.processor.api.Processor;
-import org.apache.kafka.streams.processor.api.Record;
-import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.ArgumentMatchers;
-import org.mockito.InOrder;
-import org.mockito.Mock;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
-import org.mockito.quality.Strictness;
-
-import java.util.Arrays;
-import java.util.Collections;
-
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = Strictness.STRICT_STUBS)
-public class KStreamFlatTransformValuesTest {
-
- private Integer inputKey;
- private Integer inputValue;
-
- @Mock
- private ValueTransformerWithKey> valueTransformer;
- @Mock
- private InternalProcessorContext context;
- private InOrder inOrder;
-
- private KStreamFlatTransformValuesProcessor processor;
-
- @BeforeEach
- public void setUp() {
- inputKey = 1;
- inputValue = 10;
- inOrder = inOrder(context);
- processor = new KStreamFlatTransformValuesProcessor<>(valueTransformer);
- }
-
- @Test
- public void shouldInitializeFlatTransformValuesProcessor() {
- processor.init(context);
-
- verify(valueTransformer).init(ArgumentMatchers.isA(ForwardingDisabledProcessorContext.class));
- }
-
- @Test
- public void shouldTransformInputRecordToMultipleOutputValues() {
- final Iterable outputValues = Arrays.asList(
- "Hello",
- "Blue",
- "Planet");
-
- processor.init(context);
-
- when(valueTransformer.transform(inputKey, inputValue)).thenReturn(outputValues);
-
- processor.process(new Record<>(inputKey, inputValue, 0L));
-
- for (final String outputValue : outputValues) {
- inOrder.verify(context).forward(new Record<>(inputKey, outputValue, 0L));
- }
- }
-
- @Test
- public void shouldEmitNoRecordIfTransformReturnsEmptyList() {
- processor.init(context);
-
- when(valueTransformer.transform(inputKey, inputValue)).thenReturn(Collections.emptyList());
-
- processor.process(new Record<>(inputKey, inputValue, 0L));
-
- inOrder.verify(context, never()).forward(ArgumentMatchers.>any());
- }
-
- @Test
- public void shouldEmitNoRecordIfTransformReturnsNull() {
- processor.init(context);
-
- when(valueTransformer.transform(inputKey, inputValue)).thenReturn(null);
-
- processor.process(new Record<>(inputKey, inputValue, 0L));
-
- inOrder.verify(context, never()).forward(ArgumentMatchers.>any());
- }
-
- @Test
- public void shouldCloseFlatTransformValuesProcessor() {
- processor.close();
-
- verify(valueTransformer).close();
- }
-
- @Test
- public void shouldGetFlatTransformValuesProcessor() {
- @SuppressWarnings("unchecked")
- final ValueTransformerWithKeySupplier> valueTransformerSupplier =
- mock(ValueTransformerWithKeySupplier.class);
- final KStreamFlatTransformValues processorSupplier =
- new KStreamFlatTransformValues<>(valueTransformerSupplier);
-
- when(valueTransformerSupplier.get()).thenReturn(valueTransformer);
-
- final Processor processor = processorSupplier.get();
-
- assertInstanceOf(KStreamFlatTransformValuesProcessor.class, processor);
- }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index b78696f259ade..8d2a280be4e8e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -48,8 +48,6 @@
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TopicNameExtractor;
@@ -113,33 +111,6 @@ public class KStreamImplTest {
private final Consumed stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
private final MockApiProcessorSupplier processorSupplier = new MockApiProcessorSupplier<>();
private final MockApiFixedKeyProcessorSupplier fixedKeyProcessorSupplier = new MockApiFixedKeyProcessorSupplier<>();
- @SuppressWarnings("deprecation")
- private final org.apache.kafka.streams.kstream.ValueTransformerSupplier> flatValueTransformerSupplier =
- () -> new org.apache.kafka.streams.kstream.ValueTransformer>() {
- @Override
- public void init(final ProcessorContext context) {}
-
- @Override
- public Iterable transform(final String value) {
- return Collections.singleton(value);
- }
-
- @Override
- public void close() {}
- };
- private final ValueTransformerWithKeySupplier> flatValueTransformerWithKeySupplier =
- () -> new ValueTransformerWithKey>() {
- @Override
- public void init(final ProcessorContext context) {}
-
- @Override
- public Iterable transform(final String key, final String value) {
- return Collections.singleton(value);
- }
-
- @Override
- public void close() {}
- };
private StreamsBuilder builder;
private KStream testStream;
@@ -1619,230 +1590,6 @@ public void shouldNotAllowBadProcessSupplierOnProcessValuesWithNamedAndStores()
assertThat(exception.getMessage(), containsString("#get() must return a new object each time it is called."));
}
- @Test
- @SuppressWarnings("deprecation")
- public void shouldNotAllowNullValueTransformerSupplierOnFlatTransformValues() {
- final NullPointerException exception = assertThrows(
- NullPointerException.class,
- () -> testStream.flatTransformValues((org.apache.kafka.streams.kstream.ValueTransformerSupplier