diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 8d83d1e99fa5d..f89faffd97126 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -61,8 +61,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -216,25 +214,38 @@ public class TaskManagerTest { @BeforeEach public void setUp() { - taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false); + taskManager = setUpTaskManagerWithoutStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false); } - private TaskManager setUpTaskManager(final ProcessingMode processingMode, final TasksRegistry tasks) { - return setUpTaskManager(processingMode, tasks, false); + private TaskManager setUpTaskManagerWithStateUpdater(final ProcessingMode processingMode, final TasksRegistry tasks) { + return setUpTaskManagerWithStateUpdater(processingMode, tasks, false); } - private TaskManager setUpTaskManager(final ProcessingMode processingMode, final boolean stateUpdaterEnabled) { - return setUpTaskManager(processingMode, null, stateUpdaterEnabled, false); - } - - private TaskManager setUpTaskManager(final ProcessingMode processingMode, final TasksRegistry tasks, final boolean stateUpdaterEnabled) { - return setUpTaskManager(processingMode, tasks, stateUpdaterEnabled, false); + private TaskManager setUpTaskManagerWithStateUpdater(final ProcessingMode processingMode, + final TasksRegistry tasks, + final boolean processingThreadsEnabled) { + topologyMetadata = new TopologyMetadata(topologyBuilder, new DummyStreamsConfig(processingMode)); + final TaskManager taskManager = new TaskManager( + time, + changeLogReader, + ProcessId.randomProcessId(), + "taskManagerTest", + activeTaskCreator, + standbyTaskCreator, + tasks != null ? tasks : new Tasks(new LogContext()), + topologyMetadata, + adminClient, + stateDirectory, + stateUpdater, + processingThreadsEnabled ? schedulingTaskManager : null + ); + taskManager.setMainConsumer(consumer); + return taskManager; } - private TaskManager setUpTaskManager(final ProcessingMode processingMode, - final TasksRegistry tasks, - final boolean stateUpdaterEnabled, - final boolean processingThreadsEnabled) { + private TaskManager setUpTaskManagerWithoutStateUpdater(final ProcessingMode processingMode, + final TasksRegistry tasks, + final boolean processingThreadsEnabled) { topologyMetadata = new TopologyMetadata(topologyBuilder, new DummyStreamsConfig(processingMode)); final TaskManager taskManager = new TaskManager( time, @@ -247,7 +258,7 @@ private TaskManager setUpTaskManager(final ProcessingMode processingMode, topologyMetadata, adminClient, stateDirectory, - stateUpdaterEnabled ? stateUpdater : null, + null, processingThreadsEnabled ? schedulingTaskManager : null ); taskManager.setMainConsumer(consumer); @@ -261,7 +272,7 @@ public void shouldLockAllTasksOnCorruptionWithProcessingThreads() { .inState(State.RUNNING) .withInputPartitions(taskId00Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01)); when(tasks.task(taskId00)).thenReturn(activeTask1); final KafkaFuture mockFuture = KafkaFuture.completedFuture(null); @@ -283,7 +294,7 @@ public void shouldLockCommitableTasksOnCorruptionWithProcessingThreads() { .inState(State.RUNNING) .withInputPartitions(taskId01Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true); final KafkaFuture mockFuture = KafkaFuture.completedFuture(null); when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture); @@ -296,14 +307,14 @@ public void shouldLockCommitableTasksOnCorruptionWithProcessingThreads() { @Test public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() { final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(tasks.allTaskIds()).thenReturn(Set.of(taskId00, taskId01)); final KafkaFuture mockFuture = KafkaFuture.completedFuture(null); when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture); taskManager.handleAssignment( mkMap(mkEntry(taskId00, taskId00Partitions)), - mkMap(mkEntry(taskId01, taskId01Partitions)) + mkMap(mkEntry(taskId01, taskId01Partitions)) ); verify(schedulingTaskManager).lockTasks(Set.of(taskId00, taskId01)); @@ -319,7 +330,7 @@ public void shouldLockAffectedTasksOnHandleRevocation() { .inState(State.RUNNING) .withInputPartitions(taskId01Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2)); final KafkaFuture mockFuture = KafkaFuture.completedFuture(null); when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture); @@ -339,7 +350,7 @@ public void shouldLockTasksOnClose() { .inState(State.RUNNING) .withInputPartitions(taskId01Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true); when(tasks.allTasks()).thenReturn(Set.of(activeTask1, activeTask2)); final KafkaFuture mockFuture = KafkaFuture.completedFuture(null); when(schedulingTaskManager.lockTasks(any())).thenReturn(mockFuture); @@ -359,7 +370,7 @@ public void shouldResumePollingForPartitionsWithAvailableSpaceForAllActiveTasks( .inState(State.RUNNING) .withInputPartitions(taskId01Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2)); taskManager.resumePollingForPartitionsWithAvailableSpace(); @@ -377,7 +388,7 @@ public void shouldUpdateLagForAllActiveTasks() { .inState(State.RUNNING) .withInputPartitions(taskId01Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.activeTasks()).thenReturn(Set.of(activeTask1, activeTask2)); taskManager.updateLags(); @@ -392,7 +403,7 @@ public void shouldRemoveUnusedActiveTaskFromStateUpdaterAndCloseCleanly() { .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose)); final CompletableFuture future = new CompletableFuture<>(); when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future); @@ -412,7 +423,7 @@ public void shouldRemoveUnusedFailedActiveTaskFromStateUpdaterAndCloseDirty() { .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose)); final CompletableFuture future = new CompletableFuture<>(); when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(future); @@ -433,7 +444,7 @@ public void shouldRemoveUnusedStandbyTaskFromStateUpdaterAndCloseCleanly() { .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToClose)); final CompletableFuture future = new CompletableFuture<>(); when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future); @@ -453,7 +464,7 @@ public void shouldRemoveUnusedFailedStandbyTaskFromStateUpdaterAndCloseDirty() { .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToClose)); final CompletableFuture future = new CompletableFuture<>(); when(stateUpdater.remove(standbyTaskToClose.id())).thenReturn(future); @@ -474,7 +485,7 @@ public void shouldCollectFailedTaskFromStateUpdaterAndRethrow() { .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(failedStandbyTask)); final CompletableFuture future = new CompletableFuture<>(); when(stateUpdater.remove(failedStandbyTask.id())).thenReturn(future); @@ -501,7 +512,7 @@ public void shouldUpdateInputPartitionOfActiveTaskInStateUpdater() { .withInputPartitions(taskId03Partitions).build(); final Set newInputPartitions = taskId02Partitions; final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions)); final CompletableFuture future = new CompletableFuture<>(); when(stateUpdater.remove(activeTaskToUpdateInputPartitions.id())).thenReturn(future); @@ -529,7 +540,7 @@ public void shouldRecycleActiveTaskInStateUpdater() { .inState(State.CREATED) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle)); when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, taskId03Partitions)) .thenReturn(recycledStandbyTask); @@ -553,7 +564,7 @@ public void shouldHandleExceptionThrownDuringRecyclingActiveTask() { .inState(State.RESTORING) .withInputPartitions(taskId00Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToRecycle)); when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, activeTaskToRecycle.inputPartitions())) .thenThrow(new RuntimeException()); @@ -583,7 +594,7 @@ public void shouldRecycleStandbyTaskInStateUpdater() { .inState(State.CREATED) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle)); when(activeTaskCreator.createActiveTaskFromStandby(standbyTaskToRecycle, taskId03Partitions, consumer)) .thenReturn(recycledActiveTask); @@ -607,7 +618,7 @@ public void shouldHandleExceptionThrownDuringRecyclingStandbyTask() { .inState(State.RUNNING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToRecycle)); when(activeTaskCreator.createActiveTaskFromStandby( standbyTaskToRecycle, @@ -637,7 +648,7 @@ public void shouldKeepReassignedActiveTaskInStateUpdater() { .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask)); taskManager.handleAssignment( @@ -656,7 +667,7 @@ public void shouldMoveReassignedSuspendedActiveTaskToStateUpdater() { .inState(State.SUSPENDED) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask)); taskManager.handleAssignment( @@ -676,7 +687,7 @@ public void shouldAddFailedActiveTaskToRecycleDuringAssignmentToTaskRegistry() { .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(failedActiveTaskToRecycle)); final RuntimeException taskException = new RuntimeException("Nobody expects the Spanish inquisition!"); when(stateUpdater.remove(failedActiveTaskToRecycle.id())) @@ -706,7 +717,7 @@ public void shouldAddFailedStandbyTaskToRecycleDuringAssignmentToTaskRegistry() .inState(State.RUNNING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(failedStandbyTaskToRecycle)); final RuntimeException taskException = new RuntimeException("Nobody expects the Spanish inquisition!"); when(stateUpdater.remove(failedStandbyTaskToRecycle.id())) @@ -736,7 +747,7 @@ public void shouldAddFailedActiveTasksToReassignWithDifferentInputPartitionsDuri .inState(State.RESTORING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(failedActiveTaskToReassign)); final RuntimeException taskException = new RuntimeException("Nobody expects the Spanish inquisition!"); when(stateUpdater.remove(failedActiveTaskToReassign.id())) @@ -769,7 +780,7 @@ public void shouldFirstHandleTasksInStateUpdaterThenSuspendedActiveTasksInTaskRe .inState(State.RESTORING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(reassignedActiveTask1)); when(stateUpdater.tasks()).thenReturn(Set.of(reassignedActiveTask2)); when(stateUpdater.remove(reassignedActiveTask2.id())) @@ -795,7 +806,7 @@ public void shouldNeverUpdateInputPartitionsOfStandbyTaskInStateUpdater() { .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions)); taskManager.handleAssignment( @@ -813,7 +824,7 @@ public void shouldKeepReassignedStandbyTaskInStateUpdater() { .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(reassignedStandbyTask)); taskManager.handleAssignment( @@ -837,7 +848,7 @@ public void shouldAssignMultipleTasksInStateUpdater() { .inState(State.CREATED) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(activeTaskToClose, standbyTaskToRecycle)); final CompletableFuture futureForActiveTaskToClose = new CompletableFuture<>(); when(stateUpdater.remove(activeTaskToClose.id())).thenReturn(futureForActiveTaskToClose); @@ -872,7 +883,7 @@ public void shouldReturnRunningTasksStateUpdaterTasksAndTasksToInitInAllTasks() .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(standbyTaskInStateUpdater)); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, runningActiveTask))); @@ -896,7 +907,7 @@ public void shouldNotReturnStateUpdaterTasksInOwnedTasks() { .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId03, activeTask))); assertEquals(taskManager.allOwnedTasks(), mkMap(mkEntry(taskId03, activeTask))); @@ -908,7 +919,7 @@ public void shouldCreateActiveTaskDuringAssignment() { .inState(State.CREATED) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); final Set createdTasks = Set.of(activeTaskToBeCreated); final Map> tasksToBeCreated = mkMap( mkEntry(activeTaskToBeCreated.id(), activeTaskToBeCreated.inputPartitions())); @@ -926,7 +937,7 @@ public void shouldCreateStandbyTaskDuringAssignment() { .inState(State.CREATED) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); final Set createdTasks = Set.of(standbyTaskToBeCreated); when(standbyTaskCreator.createTasks(mkMap( mkEntry(standbyTaskToBeCreated.id(), standbyTaskToBeCreated.inputPartitions()))) @@ -953,7 +964,7 @@ public void shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInitWithState when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToRecycle)); when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, taskId01Partitions)) .thenReturn(standbyTask); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions))); @@ -964,28 +975,6 @@ public void shouldAddRecycledStandbyTasksFromActiveToPendingTasksToInitWithState verify(standbyTaskCreator).createTasks(Collections.emptyMap()); } - @Test - public void shouldAddRecycledStandbyTasksFromActiveToTaskRegistryWithStateUpdaterDisabled() { - final StreamTask activeTaskToRecycle = statefulTask(taskId01, taskId01ChangelogPartitions) - .withInputPartitions(taskId01Partitions) - .inState(State.RUNNING).build(); - final StandbyTask standbyTask = standbyTask(taskId01, taskId01ChangelogPartitions) - .withInputPartitions(taskId01Partitions) - .inState(State.CREATED).build(); - final TasksRegistry tasks = mock(TasksRegistry.class); - when(tasks.allTasks()).thenReturn(Set.of(activeTaskToRecycle)); - when(standbyTaskCreator.createStandbyTaskFromActive(activeTaskToRecycle, taskId01Partitions)) - .thenReturn(standbyTask); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false); - - taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01, taskId01Partitions))); - - verify(activeTaskToRecycle).prepareCommit(true); - verify(tasks).replaceActiveWithStandby(standbyTask); - verify(activeTaskCreator).createTasks(consumer, Collections.emptyMap()); - verify(standbyTaskCreator).createTasks(Collections.emptyMap()); - } - @Test public void shouldThrowDuringAssignmentIfStandbyTaskToRecycleIsFoundInTasksRegistryWithStateUpdaterEnabled() { final StandbyTask standbyTaskToRecycle = standbyTask(taskId03, taskId03ChangelogPartitions) @@ -993,7 +982,7 @@ public void shouldThrowDuringAssignmentIfStandbyTaskToRecycleIsFoundInTasksRegis .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToRecycle)); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); final IllegalStateException illegalStateException = assertThrows( IllegalStateException.class, @@ -1014,7 +1003,7 @@ public void shouldAssignActiveTaskInTasksRegistryToBeClosedCleanlyWithStateUpdat .inState(State.RUNNING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose)); taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap()); @@ -1032,7 +1021,7 @@ public void shouldThrowDuringAssignmentIfStandbyTaskToCloseIsFoundInTasksRegistr .inState(State.RUNNING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToClose)); final IllegalStateException illegalStateException = assertThrows( @@ -1052,7 +1041,7 @@ public void shouldAssignActiveTaskInTasksRegistryToUpdateInputPartitionsWithStat .withInputPartitions(taskId03Partitions).build(); final Set newInputPartitions = taskId02Partitions; final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToUpdateInputPartitions)); when(tasks.updateActiveTaskInputPartitions(activeTaskToUpdateInputPartitions, newInputPartitions)).thenReturn(true); @@ -1072,7 +1061,7 @@ public void shouldResumeActiveRunningTaskInTasksRegistryWithStateUpdaterEnabled( .inState(State.RUNNING) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume)); taskManager.handleAssignment( @@ -1090,7 +1079,7 @@ public void shouldResumeActiveSuspendedTaskInTasksRegistryAndAddToStateUpdater() .inState(State.SUSPENDED) .withInputPartitions(taskId03Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToResume)); taskManager.handleAssignment( @@ -1112,7 +1101,7 @@ public void shouldThrowDuringAssignmentIfStandbyTaskToUpdateInputPartitionsIsFou .withInputPartitions(taskId02Partitions).build(); final Set newInputPartitions = taskId03Partitions; final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(standbyTaskToUpdateInputPartitions)); final IllegalStateException illegalStateException = assertThrows( @@ -1137,7 +1126,7 @@ public void shouldAssignMultipleTasksInTasksRegistryWithStateUpdaterEnabled() { .inState(State.CREATED) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(activeTaskToClose)); taskManager.handleAssignment( @@ -1163,7 +1152,7 @@ public void shouldAddTasksToStateUpdater() { .inState(State.RUNNING).build(); final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, task01)); - taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true); + taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); @@ -1185,7 +1174,7 @@ public void shouldRetryInitializationWhenLockExceptionInStateUpdater() { when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, task01)); final LockException lockException = new LockException("Where are my keys??"); doThrow(lockException).when(task00).initializeIfNeeded(); - taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true); + taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); @@ -1209,7 +1198,7 @@ public void shouldRetryInitializationWithBackoffWhenInitializationFails() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, task01)); doThrow(new LockException("Lock Exception!")).when(task00).initializeIfNeeded(); - taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true); + taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); @@ -1255,7 +1244,7 @@ public void shouldRethrowRuntimeExceptionInInitTaskWithStateUpdater() { when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00)); final RuntimeException runtimeException = new RuntimeException("KABOOM!"); doThrow(runtimeException).when(task00).initializeIfNeeded(); - taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, true); + taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); final StreamsException streamsException = assertThrows( StreamsException.class, @@ -1281,7 +1270,7 @@ public void shouldRethrowTaskCorruptedExceptionFromInitialization() { .inState(State.CREATED) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, tasks, false); when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(statefulTask0, statefulTask1, statefulTask2)); doThrow(new TaskCorruptedException(Collections.singleton(statefulTask0.id))).when(statefulTask0).initializeIfNeeded(); doThrow(new TaskCorruptedException(Collections.singleton(statefulTask1.id))).when(statefulTask1).initializeIfNeeded(); @@ -1302,7 +1291,7 @@ public void shouldRethrowTaskCorruptedExceptionFromInitialization() { public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreRestoring() { when(stateUpdater.restoresActiveTasks()).thenReturn(true); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); assertFalse(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)); } @@ -1311,7 +1300,7 @@ public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreRestoring() { public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTaskToRecycleButPendingTasksToInit() { final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.hasPendingTasksToInit()).thenReturn(true); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); assertFalse(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)); } @@ -1319,7 +1308,7 @@ public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringAn @Test public void shouldReturnTrueFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingInit() { final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)); } @@ -1539,7 +1528,7 @@ public void shouldCloseTasksWhenRemoveFailedActiveTasksFromStateUpdaterOnPartiti private TaskManager setupForRevocationAndLost(final Set tasksInStateUpdater, final TasksRegistry tasks) { - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(tasksInStateUpdater); return taskManager; @@ -1608,12 +1597,12 @@ private TaskManager setUpTransitionToRunningOfRestoredTask(final Set when(stateUpdater.restoresActiveTasks()).thenReturn(true); when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(statefulTasks); - return setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + return setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); } @Test public void shouldReturnCorrectBooleanWhenTryingToCompleteRestorationWithStateUpdater() { - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null, false); when(stateUpdater.restoresActiveTasks()).thenReturn(false); assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter)); when(stateUpdater.restoresActiveTasks()).thenReturn(true); @@ -1631,7 +1620,7 @@ public void shouldRethrowStreamsExceptionFromStateUpdater() { when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks)); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); final StreamsException thrown = assertThrows( StreamsException.class, @@ -1658,7 +1647,7 @@ public void shouldRethrowTaskCorruptedExceptionFromStateUpdater() { when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(exceptionAndTasks0, exceptionAndTasks1)); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); final TaskCorruptedException thrown = assertThrows( TaskCorruptedException.class, @@ -1742,7 +1731,7 @@ public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() { .inState(State.RUNNING) .withInputPartitions(taskId00Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allNonFailedTasks()).thenReturn(Set.of(statefulTask0)); final Set assigned = Set.of(t1p0, t1p1); when(consumer.assignment()).thenReturn(assigned); @@ -1787,7 +1776,7 @@ public void shouldReleaseLockForUnassignedTasksAfterRebalanceWithStateUpdater() .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask))); when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask, restoringStatefulTask)); when(tasks.allNonFailedTasks()).thenReturn(Set.of(runningStatefulTask)); @@ -1819,7 +1808,7 @@ public void shouldComputeOffsetSumForRunningStatefulTask() { when(runningStatefulTask.changelogOffsets()) .thenReturn(mkMap(mkEntry(t1p0changelog, changelogOffsetOfRunningTask))); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask))); assertThat( @@ -1850,7 +1839,7 @@ public void shouldComputeOffsetSumForRestoringActiveTaskWithStateUpdater() throw final Map changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L)); writeCheckpointFile(taskId00, changelogOffsetInCheckpoint); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask)); taskManager.handleRebalanceStart(singleton("topic")); @@ -1868,7 +1857,7 @@ public void shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() thro final Map changelogOffsetInCheckpoint = mkMap(mkEntry(t1p0changelog, 24L)); writeCheckpointFile(taskId00, changelogOffsetInCheckpoint); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask)); taskManager.handleRebalanceStart(singleton("topic")); @@ -1893,7 +1882,7 @@ public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTask() { when(restoringStandbyTask.changelogOffsets()) .thenReturn(mkMap(mkEntry(t1p2changelog, changelogOffsetOfRestoringStandbyTask))); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, runningStatefulTask))); when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask, restoringStatefulTask)); @@ -1918,7 +1907,7 @@ public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() { mkEntry(t1p1changelog2, OffsetCheckpoint.OFFSET_UNKNOWN) )); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks); + final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, false); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId01, restoringStatefulTask))); when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask)); @@ -1955,17 +1944,23 @@ public void shouldComputeOffsetSumForStandbyTask() throws Exception { ); final Map expectedOffsetSums = mkMap(mkEntry(taskId00, 15L)); + final StandbyTask standbyTask = standbyTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RUNNING) + .withInputPartitions(taskId00Partitions) + .build(); + when(standbyTask.changelogOffsets()).thenReturn(changelogOffsets); + + final TasksRegistry tasks = mock(TasksRegistry.class); + taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); + + when(stateUpdater.tasks()).thenReturn(Set.of(standbyTask)); + expectLockObtainedFor(taskId00); expectDirectoryNotEmpty(taskId00); makeTaskFolders(taskId00.toString()); taskManager.handleRebalanceStart(singleton("topic")); - final StateMachineTask restoringTask = handleAssignment( - emptyMap(), - taskId00Assignment, - emptyMap() - ).get(taskId00); - restoringTask.setChangelogOffsets(changelogOffsets); + taskManager.handleAssignment(emptyMap(), taskId00Assignment); assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums)); } @@ -2174,7 +2169,7 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() { @Test public void shouldReInitializeStreamsProducerOnHandleLostAllIfEosV2Enabled() { - final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false); + final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false); taskManager.handleLostAll(); @@ -2190,7 +2185,7 @@ public void shouldReAddRevivedTasksToStateUpdater() { .inState(State.RUNNING) .withInputPartitions(taskId02Partitions).build(); final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(tasks.task(taskId03)).thenReturn(corruptedActiveTask); when(tasks.task(taskId02)).thenReturn(corruptedStandbyTask); @@ -2281,38 +2276,43 @@ public void suspend() { @Test public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() { - final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); - - final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); - final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); + final StreamTask corruptedTask = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); - final Map> firstAssignment = new HashMap<>(taskId00Assignment); - firstAssignment.putAll(taskId01Assignment); + final StreamTask nonCorruptedTask = statefulTask(taskId01, taskId01ChangelogPartitions) + .withInputPartitions(taskId01Partitions) + .inState(State.RUNNING) + .build(); - // `handleAssignment` - when(activeTaskCreator.createTasks(any(), eq(firstAssignment))) - .thenReturn(asList(corruptedTask, nonCorruptedTask)); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.task(taskId00)).thenReturn(corruptedTask); + when(tasks.allTasksPerId()).thenReturn(mkMap( + mkEntry(taskId00, corruptedTask), + mkEntry(taskId01, nonCorruptedTask) + )); + when(tasks.activeTaskIds()).thenReturn(Set.of(taskId00, taskId01)); - when(consumer.assignment()) - .thenReturn(assignment) - .thenReturn(taskId00Partitions); + when(nonCorruptedTask.commitNeeded()).thenReturn(true); + when(nonCorruptedTask.prepareCommit(true)).thenReturn(emptyMap()); + when(corruptedTask.prepareCommit(false)).thenReturn(emptyMap()); + doNothing().when(corruptedTask).postCommit(anyBoolean()); - taskManager.handleAssignment(firstAssignment, emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), tp -> assertThat(tp, is(empty()))), is(true)); + when(consumer.assignment()).thenReturn(taskId00Partitions); - assertThat(nonCorruptedTask.state(), is(Task.State.RUNNING)); - nonCorruptedTask.setCommitNeeded(); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); - corruptedTask.setChangelogOffsets(singletonMap(t1p0, 0L)); - taskManager.handleCorruption(singleton(taskId00)); + taskManager.handleCorruption(Set.of(taskId00)); - assertTrue(nonCorruptedTask.commitPrepared); - assertThat(nonCorruptedTask.partitionsForOffsetReset, equalTo(Collections.emptySet())); - assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions)); + verify(nonCorruptedTask).prepareCommit(true); + verify(nonCorruptedTask, never()).addPartitionsForOffsetReset(any()); + verify(corruptedTask).addPartitionsForOffsetReset(taskId00Partitions); + verify(corruptedTask).changelogPartitions(); + verify(corruptedTask).postCommit(true); // check that we should not commit empty map either verify(consumer, never()).commitSync(emptyMap()); - verify(stateManager).markChangelogAsCorrupted(taskId00Partitions); } @Test @@ -2359,7 +2359,7 @@ public void shouldNotCommitNonCorruptedRestoringActiveTasksAndNotCommitRunningSt final TasksRegistry tasks = mock(TasksRegistry.class); when(tasks.allTasksPerId()).thenReturn(mkMap(mkEntry(taskId02, corruptedTask))); when(tasks.task(taskId02)).thenReturn(corruptedTask); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); when(consumer.assignment()).thenReturn(intersection(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions)); taskManager.handleCorruption(Set.of(taskId02)); @@ -2372,37 +2372,6 @@ public void shouldNotCommitNonCorruptedRestoringActiveTasksAndNotCommitRunningSt verify(standbyTask, never()).postCommit(anyBoolean()); } - @Test - public void shouldNotCommitNonCorruptedRestoringActiveTasksAndCommitRunningStandbyTasksWithStateUpdaterDisabled() { - final StreamTask activeRestoringTask = statefulTask(taskId00, taskId00ChangelogPartitions) - .withInputPartitions(taskId00Partitions) - .inState(State.RESTORING).build(); - final StandbyTask standbyTask = standbyTask(taskId01, taskId01ChangelogPartitions) - .withInputPartitions(taskId01Partitions) - .inState(State.RUNNING).build(); - when(standbyTask.commitNeeded()).thenReturn(true); - final StreamTask corruptedTask = statefulTask(taskId02, taskId02ChangelogPartitions) - .withInputPartitions(taskId02Partitions) - .inState(State.RUNNING).build(); - final TasksRegistry tasks = mock(TasksRegistry.class); - when(tasks.allTasksPerId()).thenReturn(mkMap( - mkEntry(taskId00, activeRestoringTask), - mkEntry(taskId01, standbyTask), - mkEntry(taskId02, corruptedTask) - )); - when(tasks.task(taskId02)).thenReturn(corruptedTask); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false); - when(consumer.assignment()).thenReturn(intersection(HashSet::new, taskId00Partitions, taskId01Partitions, taskId02Partitions)); - - taskManager.handleCorruption(Set.of(taskId02)); - - verify(activeRestoringTask, never()).commitNeeded(); - verify(activeRestoringTask, never()).prepareCommit(true); - verify(activeRestoringTask, never()).postCommit(anyBoolean()); - verify(standbyTask).prepareCommit(true); - verify(standbyTask).postCommit(anyBoolean()); - } - @Test public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() { final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); @@ -2545,7 +2514,7 @@ public void markChangelogAsCorrupted(final Collection partitions @Test public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS() { - final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false); + final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false); final StreamsProducer producer = mock(StreamsProducer.class); when(activeTaskCreator.streamsProducer()).thenReturn(producer); final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); @@ -2677,7 +2646,7 @@ public void markChangelogAsCorrupted(final Collection partitions @Test public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithEOS() { - final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false); + final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false); final StreamsProducer producer = mock(StreamsProducer.class); when(activeTaskCreator.streamsProducer()).thenReturn(producer); final ProcessorStateManager stateManager = mock(ProcessorStateManager.class); @@ -2919,7 +2888,7 @@ public void shouldSuspendActiveTasksDuringRevocation() { @Test public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEosV2() { final StreamsProducer producer = mock(StreamsProducer.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false); + final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false); final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); final Map offsets00 = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); @@ -3076,7 +3045,7 @@ public void shouldNotCommitIfNoRevokedTasksNeedCommitting() { @Test public void shouldNotCommitIfNoRevokedTasksNeedCommittingWithEOSv2() { - final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false); + final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false); final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager); @@ -3212,7 +3181,7 @@ public void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdownWithExact } private void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown(final ProcessingMode processingMode) { - final TaskManager taskManager = setUpTaskManager(processingMode, null, false); + final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(processingMode, null, false); final TopicPartition changelog = new TopicPartition("changelog", 0); final Map> assignment = mkMap( @@ -3367,7 +3336,7 @@ public Set changelogPartitions() { @Test public void shouldOnlyCommitRevokedStandbyTaskAndPropagatePrepareCommitException() { - setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false); + setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false); final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, false, stateManager); @@ -3533,7 +3502,7 @@ public void shouldShutDownStateUpdaterAndCloseFailedTasksDirty() { new ExceptionAndTask(new RuntimeException(), failedStatefulTask), new ExceptionAndTask(new RuntimeException(), failedStandbyTask)) ); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); taskManager.shutdown(true); @@ -3547,7 +3516,7 @@ public void shouldShutDownStateUpdaterAndCloseFailedTasksDirty() { @Test public void shouldShutdownSchedulingTaskManager() { final TasksRegistry tasks = mock(TasksRegistry.class); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks, true); taskManager.shutdown(true); @@ -3596,7 +3565,7 @@ public void shouldShutDownStateUpdaterAndCloseDirtyTasksFailedDuringRemoval() { new ExceptionAndTask(new StreamsException("KABOOM!"), removedFailedStatefulTaskDuringRemoval), new ExceptionAndTask(new StreamsException("KABOOM!"), removedFailedStandbyTaskDuringRemoval) )); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); futureForRemovedStatefulTask.complete(new StateUpdater.RemovedTaskResult(removedStatefulTask)); futureForRemovedStandbyTask.complete(new StateUpdater.RemovedTaskResult(removedStandbyTask)); futureForRemovedFailedStatefulTask @@ -3825,7 +3794,7 @@ public void shouldCommitViaProducerIfEosV2Enabled() { allOffsets.putAll(offsetsT01); allOffsets.putAll(offsetsT02); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false); + final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false); final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager); task01.setCommittableOffsetsAndMetadata(offsetsT01); @@ -4248,28 +4217,30 @@ public boolean maybePunctuateStreamTime() { @Test public void shouldPunctuateActiveTasks() { - final StateMachineTask task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { - @Override - public boolean maybePunctuateStreamTime() { - return true; - } - @Override - public boolean maybePunctuateSystemTime() { - return true; - } - }; + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RUNNING) + .build(); - when(consumer.assignment()).thenReturn(assignment); - when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); + when(task00.maybePunctuateStreamTime()).thenReturn(true); + when(task00.maybePunctuateSystemTime()).thenReturn(true); - taskManager.handleAssignment(taskId00Assignment, emptyMap()); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.activeTasks()).thenReturn(Set.of(task00)); - assertThat(task00.state(), is(Task.State.RUNNING)); + when(stateUpdater.restoresActiveTasks()).thenReturn(false); + when(stateUpdater.hasExceptionsAndFailedTasks()).thenReturn(false); + when(stateUpdater.drainRestoredActiveTasks(any(Duration.class))).thenReturn(Set.of()); + when(stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(List.of()); + + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); // one for stream and one for system time assertThat(taskManager.punctuate(), equalTo(2)); + + verify(task00).maybePunctuateStreamTime(); + verify(task00).maybePunctuateSystemTime(); } @Test @@ -4543,7 +4514,7 @@ public void shouldNotFailForTimeoutExceptionOnConsumerCommit() { @Test public void shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosV2() { - final TaskManager taskManager = setUpTaskManager(ProcessingMode.EXACTLY_ONCE_V2, false); + final TaskManager taskManager = setUpTaskManagerWithoutStateUpdater(ProcessingMode.EXACTLY_ONCE_V2, null, false); final StreamsProducer producer = mock(StreamsProducer.class); when(activeTaskCreator.streamsProducer()).thenReturn(producer); @@ -4748,7 +4719,7 @@ public void shouldUseStartupTasksFromStateDirectoryAsStandby() { @Test public void shouldRecycleStartupTasksFromStateDirectoryAsActiveWithStateUpdater() { final Tasks taskRegistry = new Tasks(new LogContext()); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry); final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build(); final StreamTask activeTask = statefulTask(taskId00, taskId00ChangelogPartitions).build(); @@ -4786,7 +4757,7 @@ public void shouldRecycleStartupTasksFromStateDirectoryAsActiveWithStateUpdater( @Test public void shouldUseStartupTasksFromStateDirectoryAsStandbyWithStateUpdater() { final Tasks taskRegistry = new Tasks(new LogContext()); - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, taskRegistry, true); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, taskRegistry); final StandbyTask startupTask = standbyTask(taskId00, taskId00ChangelogPartitions).build(); when(stateDirectory.hasStartupTasks()).thenReturn(true, true, false); @@ -4816,16 +4787,11 @@ public void shouldUseStartupTasksFromStateDirectoryAsStandbyWithStateUpdater() { assertEquals(Collections.singletonMap(taskId00, startupTask), taskManager.standbyTaskMap()); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void shouldStartStateUpdaterOnInit(final boolean stateUpdaterEnabled) { - final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, stateUpdaterEnabled); + @Test + public void shouldStartStateUpdaterOnInit() { + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, null); taskManager.init(); - if (stateUpdaterEnabled) { - verify(stateUpdater).start(); - } else { - verify(stateUpdater, never()).start(); - } + verify(stateUpdater).start(); } private static KafkaFutureImpl completedFuture() {