Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 0 additions & 60 deletions streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -287,66 +287,6 @@ public class StreamsConfig extends AbstractConfig {
OPTIMIZE, NO_OPTIMIZATION, REUSE_KTABLE_SOURCE_TOPICS, MERGE_REPARTITION_TOPICS,
SINGLE_STORE_SELF_JOIN);

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_0100 = UpgradeFromValues.UPGRADE_FROM_0100.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.1.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_0101 = UpgradeFromValues.UPGRADE_FROM_0101.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.2.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_0102 = UpgradeFromValues.UPGRADE_FROM_0102.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.11.0.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_0110 = UpgradeFromValues.UPGRADE_FROM_0110.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.0.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_10 = UpgradeFromValues.UPGRADE_FROM_10.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 1.1.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_11 = UpgradeFromValues.UPGRADE_FROM_11.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.0.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_20 = UpgradeFromValues.UPGRADE_FROM_20.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.1.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_21 = UpgradeFromValues.UPGRADE_FROM_21.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.2.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_22 = UpgradeFromValues.UPGRADE_FROM_22.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.3.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_23 = UpgradeFromValues.UPGRADE_FROM_23.toString();

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 2.4.x}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,6 @@
package org.apache.kafka.streams.internals;

public enum UpgradeFromValues {
UPGRADE_FROM_0100("0.10.0"),
UPGRADE_FROM_0101("0.10.1"),
UPGRADE_FROM_0102("0.10.2"),
UPGRADE_FROM_0110("0.11.0"),
UPGRADE_FROM_10("1.0"),
UPGRADE_FROM_11("1.1"),
UPGRADE_FROM_20("2.0"),
UPGRADE_FROM_21("2.1"),
UPGRADE_FROM_22("2.2"),
UPGRADE_FROM_23("2.3"),
UPGRADE_FROM_24("2.4"),
UPGRADE_FROM_25("2.5"),
UPGRADE_FROM_26("2.6"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,6 @@ private static boolean isUpgrade(final Map<String, ?> configs) {
}

switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
case UPGRADE_FROM_0100:
case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102:
case UPGRADE_FROM_0110:
case UPGRADE_FROM_10:
case UPGRADE_FROM_11:
case UPGRADE_FROM_20:
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,6 @@ private boolean isNotUpgrade(final Map<String, ?> configs) {
}

switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
case UPGRADE_FROM_0100:
case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102:
case UPGRADE_FROM_0110:
case UPGRADE_FROM_10:
case UPGRADE_FROM_11:
case UPGRADE_FROM_20:
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,6 @@ private static boolean upgradeFromV0(final Map<String, ?> configs) {
}

switch (UpgradeFromValues.fromString((String) upgradeFrom)) {
case UPGRADE_FROM_0100:
case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102:
case UPGRADE_FROM_0110:
case UPGRADE_FROM_10:
case UPGRADE_FROM_11:
case UPGRADE_FROM_20:
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.internals.UpgradeFromValues;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
Expand Down Expand Up @@ -59,8 +58,6 @@ public AssignorConfiguration(final Map<String, ?> configs) {
final LogContext logContext = new LogContext(logPrefix);
log = logContext.logger(getClass());

validateUpgradeFrom();

{
final Object o = configs.get(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR);
if (o == null) {
Expand Down Expand Up @@ -94,32 +91,6 @@ public ReferenceContainer referenceContainer() {
return referenceContainer;
}

// cooperative rebalancing was introduced in 2.4 and the old protocol (eager rebalancing) was removed
// in 4.0, meaning live upgrades from 2.3 or below to 4.0+ are no longer possible without a bridge release
public void validateUpgradeFrom() {
final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
if (upgradeFrom != null) {
switch (UpgradeFromValues.fromString(upgradeFrom)) {
case UPGRADE_FROM_0100:
case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102:
case UPGRADE_FROM_0110:
case UPGRADE_FROM_10:
case UPGRADE_FROM_11:
case UPGRADE_FROM_20:
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
final String errMsg = String.format(
"The eager rebalancing protocol is no longer supported in 4.0 which means live upgrades from 2.3 or below are not possible."
+ " Please see the Streams upgrade guide for the bridge releases and recommended upgrade path. Got upgrade.from='%s'", upgradeFrom);
log.error(errMsg);
throw new ConfigException(errMsg);

}
}
}

public String logPrefix() {
return logPrefix;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,28 +570,6 @@ public void shouldInterleaveTasksByGroupIdDuringNewAssignment(final Map<String,
assertThat(interleavedTaskIds, equalTo(assignment));
}

@ParameterizedTest
@MethodSource("parameter")
public void shouldThrowOnEagerSubscription(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addSource(null, "source2", null, null, null, "topic2");
builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source1", "source2");

final Set<TaskId> prevTasks = Set.of(
new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)
);
final Set<TaskId> standbyTasks = Set.of(
new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)
);

createMockTaskManager(prevTasks, standbyTasks);
assertThrows(
ConfigException.class,
() -> configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_23), parameterizedConfig)
);
}

@ParameterizedTest
@MethodSource("parameter")
public void testCooperativeSubscription(final Map<String, Object> parameterizedConfig) {
Expand Down