Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/streams/developer-guide/config-streams.html
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,7 @@ <h4><a class="toc-backref" href="#id31">topology.optimization</a><a class="heade
These optimizations include moving/reducing repartition topics and reusing the source topic as the changelog for source KTables. These optimizations will save on network traffic and storage in Kafka without changing the semantics of your applications. Enabling them is recommended.
</p>
<p>
Note that as of 2.3, you need to do two things to enable optimizations. In addition to setting this config to <code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your
Note that you need to do two things to enable optimizations. In addition to setting this config to <code>StreamsConfig.OPTIMIZE</code>, you'll need to pass in your
configuration properties when building your topology by using the overloaded <code>StreamsBuilder.build(Properties)</code> method.
For example <code>KafkaStreams myStream = new KafkaStreams(streamsBuilder.build(properties), properties)</code>.
</p>
Expand All @@ -1235,7 +1235,7 @@ <h4><a class="toc-backref" href="#id31">topology.optimization</a><a class="heade
The version you are upgrading from. It is important to set this config when performing a rolling upgrade to certain versions, as described in the upgrade guide.
You should set this config to the appropriate version before bouncing your instances and upgrading them to the newer version. Once everyone is on the
newer version, you should remove this config and do a second rolling bounce. It is only necessary to set this config and follow the two-bounce upgrade path
when upgrading from below version 2.0, or when upgrading to 2.4+ from any version lower than 2.4.
when upgrading to 3.4+ from any version lower than 3.4.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to 3.4 to align with the upgrade guide. if it's incorrect, please let me know, I will fix it

</div>
</blockquote>
</div>
Expand Down
739 changes: 6 additions & 733 deletions docs/streams/upgrade-guide.html

Large diffs are not rendered by default.

60 changes: 0 additions & 60 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -287,66 +287,6 @@ public class StreamsConfig extends AbstractConfig {
OPTIMIZE, NO_OPTIMIZATION, REUSE_KTABLE_SOURCE_TOPICS, MERGE_REPARTITION_TOPICS,
SINGLE_STORE_SELF_JOIN);

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_0100 = UpgradeFromValues.UPGRADE_FROM_0100.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.1.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_0101 = UpgradeFromValues.UPGRADE_FROM_0101.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.2.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_0102 = UpgradeFromValues.UPGRADE_FROM_0102.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.11.0.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_0110 = UpgradeFromValues.UPGRADE_FROM_0110.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.0.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_10 = UpgradeFromValues.UPGRADE_FROM_10.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.1.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_11 = UpgradeFromValues.UPGRADE_FROM_11.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.0.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_20 = UpgradeFromValues.UPGRADE_FROM_20.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.1.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_21 = UpgradeFromValues.UPGRADE_FROM_21.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.2.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_22 = UpgradeFromValues.UPGRADE_FROM_22.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.3.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_23 = UpgradeFromValues.UPGRADE_FROM_23.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.4.x}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@
package org.apache.kafka.streams.internals;

public enum UpgradeFromValues {
UPGRADE_FROM_0100("0.10.0"),
UPGRADE_FROM_0101("0.10.1"),
UPGRADE_FROM_0102("0.10.2"),
UPGRADE_FROM_0110("0.11.0"),
UPGRADE_FROM_10("1.0"),
UPGRADE_FROM_11("1.1"),
UPGRADE_FROM_20("2.0"),
UPGRADE_FROM_21("2.1"),
UPGRADE_FROM_22("2.2"),
UPGRADE_FROM_23("2.3"),
UPGRADE_FROM_24("2.4"),
UPGRADE_FROM_25("2.5"),
UPGRADE_FROM_26("2.6"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,6 @@ private static boolean isUpgrade(final Map<String, ?> configs) {
}

switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
case UPGRADE_FROM_0100:
case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102:
case UPGRADE_FROM_0110:
case UPGRADE_FROM_10:
case UPGRADE_FROM_11:
case UPGRADE_FROM_20:
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,6 @@ private boolean isNotUpgrade(final Map<String, ?> configs) {
}

switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
case UPGRADE_FROM_0100:
case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102:
case UPGRADE_FROM_0110:
case UPGRADE_FROM_10:
case UPGRADE_FROM_11:
case UPGRADE_FROM_20:
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,6 @@ private static boolean upgradeFromV0(final Map<String, ?> configs) {
}

switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
case UPGRADE_FROM_0100:
case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102:
case UPGRADE_FROM_0110:
case UPGRADE_FROM_10:
case UPGRADE_FROM_11:
case UPGRADE_FROM_20:
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
Expand Down Expand Up @@ -59,8 +58,6 @@ public AssignorConfiguration(final Map<String, ?> configs) {
final LogContext logContext = new LogContext(logPrefix);
log = logContext.logger(getClass());

validateUpgradeFrom();

{
final Object o = configs.get(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR);
if (o == null) {
Expand Down Expand Up @@ -94,32 +91,6 @@ public ReferenceContainer referenceContainer() {
return referenceContainer;
}

// cooperative rebalancing was introduced in 2.4 and the old protocol (eager rebalancing) was removed
// in 4.0, meaning live upgrades from 2.3 or below to 4.0+ are no longer possible without a bridge release
public void validateUpgradeFrom() {
final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
if (upgradeFrom != null) {
switch (UpgradeFromValues.fromString(upgradeFrom)) {
case UPGRADE_FROM_0100:
case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102:
case UPGRADE_FROM_0110:
case UPGRADE_FROM_10:
case UPGRADE_FROM_11:
case UPGRADE_FROM_20:
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
final String errMsg = String.format(
"The eager rebalancing protocol is no longer supported in 4.0 which means live upgrades from 2.3 or below are not possible."
+ " Please see the Streams upgrade guide for the bridge releases and recommended upgrade path. Got upgrade.from='%s'", upgradeFrom);
log.error(errMsg);
throw new ConfigException(errMsg);

}
}
}

public String logPrefix() {
return logPrefix;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,28 +570,6 @@ public void shouldInterleaveTasksByGroupIdDuringNewAssignment(final Map<String,
assertThat(interleavedTaskIds, equalTo(assignment));
}

@ParameterizedTest
@MethodSource("parameter")
public void shouldThrowOnEagerSubscription(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1", "source2");

final Set<TaskId> prevTasks = Set.of(
new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)
);
final Set<TaskId> standbyTasks = Set.of(
new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)
);

createMockTaskManager(prevTasks, standbyTasks);
assertThrows(
ConfigException.class,
() -> configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_23), parameterizedConfig)
);
}

@ParameterizedTest
@MethodSource("parameter")
public void testCooperativeSubscription(final Map<String, Object> parameterizedConfig) {
Expand Down