Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1863] fix(server): Fail commitShuffle when write Operation Fails #1866

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

sahibamatta
Copy link

@sahibamatta sahibamatta commented Jul 5, 2024

What changes were proposed in this pull request?

This PR lets the method ShuffleTaskManager.commitShuffle(..) fail when storageManager.write(..) method throws an exception.

  • Wrapped the write method in try-catch block for exception handling.
  • When write method throws an exception or write is not successful , then the writeError is set to true in the catch block.
  • Removed the throw new EventRetryException when writeSuccess is false in the constructor, as the ShuffleFlushManager is unable to initialize then.
  • During the commitShuffle, when the getCommittedBlockIds method gets called, it checks the writeError variable and throws an exception if it is found set.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing UTs.

@rickyma
Copy link
Contributor

rickyma commented Jul 6, 2024

When the write operation fails and throws an exception, we need to make the commitShuffle fail as well. According to your current approach, the commitShuffle will succeed regardless of whether the write operation is successful or not, which makes commitShuffle meaningless. The purpose of commitShuffle is to ensure that all events are successfully written in sync, with no residual shuffle data left in memory, before starting the downstream read stage.

@sahibamatta
Copy link
Author

sahibamatta commented Jul 10, 2024

When the write operation fails and throws an exception, we need to make the commitShuffle fail as well. According to your current approach, the commitShuffle will succeed regardless of whether the write operation is successful or not, which makes commitShuffle meaningless. The purpose of commitShuffle is to ensure that all events are successfully written in sync, with no residual shuffle data left in memory, before starting the downstream read stage.

Could you help me explain the fix expectation for the issue? The issue states that the commitShuffle will get stuck forever if the exception occurs in flush process i.e. processFlushEvent. I can see that there's a while(true) loop, through which the code only exits, when the committed block ids is found
image
. In case the block ids is not found, it runs forever in the loop or until timeout happens. Do you think it makes sense to alter the looping part (should remove while(true)) or maybe when the exception gets thrown from ShuffleFlushManager - storageManager.write part, throw a certain kind of exception which is non-retryable.

Do you think bucketing exception as retryable/non-retryable will make sense in this scenario? Also, let me know if we can connect and discuss/brainstorm it together or maybe you can provide me a little brief of the problem and approaches to solve? Thanks!

@rickyma
Copy link
Contributor

rickyma commented Jul 10, 2024

When an exception happens during the flushing process, just let commitShuffle know, make it fail.

@sahibamatta
Copy link
Author

sahibamatta commented Jul 16, 2024

When an exception happens during the flushing process, just let commitShuffle know, make it fail.

Hi made the changes in the PR. So now, when the storageManager.write(..) operation fails inside the processFlushEvent method, a writeError variable is set. When the commitShuffle runs, it calls the getCommittedBlockIds, which will first check the writeError variable, if found set it will throw: EventDiscardException. There a few things to note/questions though:

  • For now, the commitShuffle() will only fail, when the write() operation inside the processFlushEvent() fails. Do you think the logic looks fine and we should extend it for the entire processFlushEvent() i.e. fail commitShuffle , whenever there is any failure/exception inside the processFlushEvent() method?
  • For now, I'm throwing EventDiscardException exception in case the write operation fails, let me know if it works or we may need to throw any other exception type?
  • Throwing exceptions from constructor, puts ShuffleFlushManager in an undesirable state. Do you think we should instead delay throwing the exception, when a method like getCommittedBlockIds() gets called? Something similar to what I did in my current implementation?
  • I didn't add any UTs as of now, as I'm unsure how to generate a test scenario for this. We may need a mechanism to set writeError through UT so that we can assert commitShuffle() failing when write operation fails.

@sahibamatta sahibamatta changed the title [#1863] fix(server): Update Comitted Block Ids Even if Storage Manage… [#1863] fix(server): Fail commitShuffle when Write Operation Fails Jul 16, 2024
@sahibamatta sahibamatta changed the title [#1863] fix(server): Fail commitShuffle when Write Operation Fails [#1863] fix(server): Fail commitShuffle when write Operation Fails Jul 16, 2024
Copy link

Test Results

 2 647 files   - 10   2 647 suites   - 10   5h 28m 26s ⏱️ - 3m 31s
   946 tests ± 0     944 ✅ ± 0   1 💤 ±0  0 ❌  - 1  1 🔥 +1 
11 789 runs   - 10  11 773 ✅  - 10  15 💤 ±0  0 ❌  - 1  1 🔥 +1 

For more details on these errors, see this check.

Results for commit 402bf31. ± Comparison against base commit 7731998.

@rickyma
Copy link
Contributor

rickyma commented Jul 17, 2024

I think you haven't fully understood this issue. A flush event corresponds to a specific appId/shuffleId/partitionId. If the event for this appId/shuffleId/partitionId fails, then when committing the shuffle for this specific shuffleId, it should fail too. Instead of using a global boolean value writeError in ShuffleFlushManager to determine this, which would cause the commitShuffle that does not belong to this appId/shuffleId/partitionId to fail as well. We don't want this to happen.

For this issue, you'd better conduct some tests.

@sahibamatta
Copy link
Author

I think you haven't fully understood this issue. A flush event corresponds to a specific appId/shuffleId/partitionId. If the event for this appId/shuffleId/partitionId fails, then when committing the shuffle for this specific shuffleId, it should fail too. Instead of using a global boolean value writeError in ShuffleFlushManager to determine this, which would cause the commitShuffle that does not belong to this appId/shuffleId/partitionId to fail as well. We don't want this to happen.

For this issue, you'd better conduct some tests.

Apologies, for my little context and understanding of the issue, being new to the repo, it's taking sometime to get a hang of things. As mentioned by you, that a flushEvent and commitShuffle is shuffle specific, so instead of a single global variable, created a global list: shuffleIdsWithWriteError, which will contain the shuffleIds which had erroneous write. During the commitShuffle, when getCommittedBlockIds gets called, this list gets checked for the provided shuffleId, and if found it throws: EventDiscardException.

Added a UT too for it. Let me know if it makes sense to you.

startTime = System.currentTimeMillis();
boolean writeSuccess = storageManager.write(storage, handler, event);
if (!writeSuccess) {
shuffleIdsWithWriteError.add(event.getShuffleId());
Copy link
Contributor

@rickyma rickyma Jul 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is thread-safe.

It could be overwritten by other threads which may be successful.

You need to reconsider concurrency problems through this PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the delayed response, but here I used CopyOnWriteArrayList for implementing shuffleIdsWithWriteError, do you still think thread safety is a concern here, or do you think the other parts of the code are not thread safe?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please help to review this.Thx.@maobaolong

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants