Skip to content

Conversation

shashankhs11
Copy link
Contributor

@shashankhs11 shashankhs11 commented Sep 16, 2025

Changes made

  • Additional setUpTaskManager() overloaded method -- Created this
    temporarily to pass the CI pipelines so that I can work on the failing
    tests incrementally
  • Rewrote 3 tests to use stateUpdater thread

@github-actions github-actions bot added triage PRs from the community streams tests Test fixes (including flaky tests) labels Sep 16, 2025
.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
Copy link
Contributor Author

@shashankhs11 shashankhs11 Sep 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These lines added are all temporary. Once we rewrite all the tests, we can do this once in the setUp()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly. these changes to setUpTaskManager are quite confusing and I don't understand why you did it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, it is definitely a bit confusing 😅

The reason I did this is because, I wanted to identify all the tests that would fail after we removed the stateUpdaterEnabled flag. I thought the safest way to rewrite these tests incrementally would be to add another overloaded method without the flag, so we don’t break the CI checks in the meantime. This would temporarily add in a lot of unnecessary code, but my plan was to clean it up once all the tests are updated.

Do you think this approach make sense? I would really appreciate your thoughts, and I’m open to any suggestions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is confusing. Maybe you want to rename it more explicitly (setUpTaskManagerWithStateUpdater or setUpTaskManagerWithoutStateUpdater)?

Comment on lines 2283 to 2285
public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);

final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
public void shouldNotCommitCorruptedTasksOnTaskCorruptedException() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed this test from shouldCommitNonCorruptedTasksOnTaskCorruptedException. Based on my understanding, the commit logic happens at the StreamThread level, but only the exception propagation happens in TaskManager with checkStateUpdater. So I decided to omit the check for commit logic and rewrite the test.

And hence I propose to rename to shouldNotCommitCorruptedTasksOnTaskCorruptedException

Please correct if I am wrong or If I misunderstood!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't think I agree with this

The key for this test is that non-corrupted tasks are still committed as usual, the the offsets for the corrupted tasks are reset.

        assertTrue(nonCorruptedTask.commitPrepared);
        assertThat(nonCorruptedTask.partitionsForOffsetReset, equalTo(Collections.emptySet()));
        assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions));

        // check that we should not commit empty map either
        verify(consumer, never()).commitSync(emptyMap());
        verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);

This is still a valid test!

But maybe we can skip the handle Assignment / complete restoration part if we immediatelly mock a RUNNING task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote this test again in 6df4e79

@shashankhs11
Copy link
Contributor Author

I rewrote only 3 tests for now. I wanted to ensure that my approach is correct before proceeding further.
@lucasbru -- tagging for review

@github-actions github-actions bot removed the triage PRs from the community label Sep 17, 2025
@lucasbru lucasbru self-assigned this Sep 18, 2025
@lucasbru lucasbru requested a review from Copilot September 18, 2025 08:21
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR removes dead tests and rewrites 3 existing tests in TaskManagerTest to use the stateUpdater thread pattern. An additional overloaded setUpTaskManager() method was temporarily created to pass CI pipelines while working on failing tests incrementally.

  • Removed 3 dead tests that were no longer needed
  • Rewrote 3 tests to use stateUpdater thread instead of direct task manipulation
  • Added temporary overloaded setup method for incremental CI fixes

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@BeforeEach
public void setUp() {
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false);
taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, null, false, false);
Copy link
Preview

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method call now has two boolean parameters without clear meaning. Consider using named parameters or method overloading to make the intent clearer. The current call setUpTaskManager(..., false, false) is ambiguous about what each boolean controls.

Copilot uses AI. Check for mistakes.

.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
Copy link
Preview

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple test methods are calling the 3-parameter setUpTaskManager method with true for the third parameter, but this creates ambiguity about which overloaded method is being called. The new 3-parameter method expects processingThreadsEnabled while the old 4-parameter method expects stateUpdaterEnabled as the third parameter. This could lead to confusion and potential bugs when the temporary method is removed.

Copilot uses AI. Check for mistakes.

.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
Copy link
Preview

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple test methods are calling the 3-parameter setUpTaskManager method with true for the third parameter, but this creates ambiguity about which overloaded method is being called. The new 3-parameter method expects processingThreadsEnabled while the old 4-parameter method expects stateUpdaterEnabled as the third parameter. This could lead to confusion and potential bugs when the temporary method is removed.

Copilot uses AI. Check for mistakes.

public void shouldLockActiveOnHandleAssignmentWithProcessingThreads() {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
Copy link
Preview

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple test methods are calling the 3-parameter setUpTaskManager method with true for the third parameter, but this creates ambiguity about which overloaded method is being called. The new 3-parameter method expects processingThreadsEnabled while the old 4-parameter method expects stateUpdaterEnabled as the third parameter. This could lead to confusion and potential bugs when the temporary method is removed.

Copilot uses AI. Check for mistakes.

.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
Copy link
Preview

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple test methods are calling the 3-parameter setUpTaskManager method with true for the third parameter, but this creates ambiguity about which overloaded method is being called. The new 3-parameter method expects processingThreadsEnabled while the old 4-parameter method expects stateUpdaterEnabled as the third parameter. This could lead to confusion and potential bugs when the temporary method is removed.

Copilot uses AI. Check for mistakes.

.withInputPartitions(taskId01Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
Copy link
Preview

Copilot AI Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiple test methods are calling the 3-parameter setUpTaskManager method with true for the third parameter, but this creates ambiguity about which overloaded method is being called. The new 3-parameter method expects processingThreadsEnabled while the old 4-parameter method expects stateUpdaterEnabled as the third parameter. This could lead to confusion and potential bugs when the temporary method is removed.

Copilot uses AI. Check for mistakes.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I left some comments!


final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, false);

assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding - why do we actually need to call checkStateUpdater here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right! I think it's not actually required for this specific test case, but included it more as a safety check to ensure that the punctuation should happen only when the system is "ready". But, we can safely omit the line

}

@Test
public void shouldPunctuateActiveTasks() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test seems to be testing what it should. question is just whether it can be simplified (see below)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can safely omit this line

assertTrue(taskManager.checkStateUpdater(time.milliseconds(), noOpResetter));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's omit it then

Comment on lines 2283 to 2285
public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
final ProcessorStateManager stateManager = mock(ProcessorStateManager.class);

final StateMachineTask corruptedTask = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager);
final StateMachineTask nonCorruptedTask = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager);
public void shouldNotCommitCorruptedTasksOnTaskCorruptedException() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't think I agree with this

The key for this test is that non-corrupted tasks are still committed as usual, the the offsets for the corrupted tasks are reset.

        assertTrue(nonCorruptedTask.commitPrepared);
        assertThat(nonCorruptedTask.partitionsForOffsetReset, equalTo(Collections.emptySet()));
        assertThat(corruptedTask.partitionsForOffsetReset, equalTo(taskId00Partitions));

        // check that we should not commit empty map either
        verify(consumer, never()).commitSync(emptyMap());
        verify(stateManager).markChangelogAsCorrupted(taskId00Partitions);

This is still a valid test!

But maybe we can skip the handle Assignment / complete restoration part if we immediatelly mock a RUNNING task?

.withInputPartitions(taskId00Partitions).build();
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true, true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly. these changes to setUpTaskManager are quite confusing and I don't understand why you did it.

@lucasbru
Copy link
Member

@shashankhs11 let me know when you need another review here

@shashankhs11
Copy link
Contributor Author

@lucasbru I have made the changes as suggested. Tagging for review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
streams tests Test fixes (including flaky tests)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants