-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SNOW-569611 Make channel close future idempotent #162
base: master
Are you sure you want to change the base?
Conversation
@@ -65,7 +65,7 @@ void removeChannelIfSequencersMatch(SnowflakeStreamingIngestChannelInternal chan | |||
// We need to compare the channel sequencer in case the old channel was already been | |||
// removed | |||
return channelInCache != null | |||
&& channelInCache.getChannelSequencer() == channel.getChannelSequencer() | |||
&& channelInCache.getChannelSequencer().equals(channel.getChannelSequencer()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change out of curiosity? These should be long
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use Long
and i believe we discussed this long time ago, basically we want to avoid the case when the server side sends us a bad request without the channel sequencer and we translate it into 0 if using long
instead of NULL
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes and the reason is that ==
on objects checks if the object is the same while equals()
(if correctly implemented) checks if the value is the same, irrespective of the object that caries it. So this could lead to bugs.
@@ -65,6 +66,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingInges | |||
// ON_ERROR option for this channel | |||
private final OpenChannelRequest.OnErrorOption onErrorOption; | |||
|
|||
private final CompletableFuture<Void> terminationFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, add one-liner explaining what this is.
@@ -112,6 +115,9 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingInges | |||
this.encryptionKey = encryptionKey; | |||
this.encryptionKeyId = encryptionKeyId; | |||
this.onErrorOption = onErrorOption; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, remove space.
* Mark the channel as closed atomically. | ||
* @return {@code true} if the channel was already closed, {@code false} otherwise. | ||
*/ | ||
boolean markClosed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, could this be a synchronized
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me, let's have Toby or Matt give their approval before landing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the change, two questions:
- Could you update SnowflakeStreamingIngestClientInternal.close as well?
- I think your change makes close() idempotent, but it's still possible that others call
isClosed()
before theclose()
finishes, right? So the same issue would still be possible.
@@ -65,7 +65,7 @@ void removeChannelIfSequencersMatch(SnowflakeStreamingIngestChannelInternal chan | |||
// We need to compare the channel sequencer in case the old channel was already been | |||
// removed | |||
return channelInCache != null | |||
&& channelInCache.getChannelSequencer() == channel.getChannelSequencer() | |||
&& channelInCache.getChannelSequencer().equals(channel.getChannelSequencer()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use Long
and i believe we discussed this long time ago, basically we want to avoid the case when the server side sends us a bad request without the channel sequencer and we translate it into 0 if using long
instead of NULL
if (isClosed()) { | ||
return CompletableFuture.completedFuture(null); | ||
} | ||
if (!markClosed()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the function name might be confusing, might better be markAndGetClosed
?
// here we flush even if removal fails because, for some reason, | ||
// the client sequencers did not match. | ||
// This does not seem like the desirable behavior. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think I understand what you mean here, could you elaborate? But I notice a potential wrong result issue from your question.... please take a look if you're interested. PR: https://github.com/snowflakedb/snowflake-ingest-java/pull/163/files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sfc-gh-tzhang The removeChannelIfSequencersMatch()
removes the channel from the cache if 1) it was there in the first place and 2) if the client seq matches. If all those conditions are satisfied, then it makes sense to call flush()
, upload and register the created blob. If any of these conditions does not match, and the removal fails (e.g. the client seq is old), then should we flush and upload? or should we just invalidate the channel and move on?
I am not sure if this scenario is realistic but I just wanted to point this out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it makes sense, the validation check is actually in checkValidation()
at the top of this function. If the channel is not in the channel cache, it must be invalid, you could add a logWarn to verify this logic
@@ -45,7 +46,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingInges | |||
private volatile boolean isValid; | |||
|
|||
// Indicates whether the channel is closed | |||
private volatile boolean isClosed; | |||
private final AtomicReference<Boolean> isClosed; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the difference between AtomicReference and volatile in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we only use the functionality that it allows to atomically set the value of the ref and get the previous value. This allows us to be sure that do the state transitioning at most once and we perform the actions that should follow the state transition at most once.
Another nice thing about AtomicReference
is that it allows you to do an atomic compare-and-swap
. So this is useful for state machine implementation.
Before, we had:
if (isClosed()) {
return CompletableFuture.completedFuture(null);
}
markClosed();
which could have led to two (or more) threads calling close() and both of them seeing isClosed() == false
due to timing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't AtomicReference<Boolean>
same as AtomicBoolean
?
The reason I did not change this is because I do not know what do we want the semantics of
Could you please clarify what do you mean by the second point? If someone calls |
Actually I think that for the Also we do not do any Finally, should we have for each channel a |
I think we could just make close() synchronized, so any subsequent call to it will wait for the first one to finish. I believed the issue you mentioned to me the other day was that with the current implementation, the first call to |
You are right that the problem was that the first call to Did you have a chance to see my other questions about the |
Allocator.close should be sufficient since we use it the parent allocator when creating the arrow buffers. We were actually calling |
Correct, then it seems like a problem that subsequent close() and isClose() behaves differently? close() will wait but isClose() doesn't, customer could use isClose() and still have the same issue you mentioned. I think we want to keep the behavior consistent, right? |
Oh now I see your point. I think that the best solution to this is to not expose the If we still want to expose the
I did this in the PR btw but I still believe that we should not expose the |
I am not suggesting to expose those "close" methods to the user. The only method the user will see is the "close-and-flush". The others will be internal. I have to think a bit about this and come back to you about the client.close(). |
Makes sense to me, let's do that! @sfc-gh-japatel Are you using the |
I might miss something, but if we make the function |
I am using |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, please make sure you pass all the merge gate, as well as StreamingIngestIT.java
@@ -258,17 +264,23 @@ void invalidate() { | |||
/** @return a boolean to indicate whether the channel is closed or not */ | |||
@Override | |||
public boolean isClosed() { | |||
return this.isClosed; | |||
return this.terminationFuture.isDone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like KC is using it, I'm fine with both approach depends on whether KC has a workaround
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, then I can leave it given that people are using it and also now it is safer :)
For the tests now,
Now that we changed the semantics of the |
You could modify the test to call .close() first, it shouldn't be an issue
Ah, yes! i think we have a bigger issue (wrong result)! We need isClose to return true right after markClose is called, otherwise we may still insert rows into the buffer even after we mark the channel as closed and start flushing, so we need to change the isClose back. I don't have a good idea now, we could just merge your change to fix part of the idempotent issue, or we could just keep my old logic, both options work for me. |
ed34e34
to
1ea2ac3
Compare
1ea2ac3
to
af788a0
Compare
SNOW-569611
@sfc-gh-tzhang I also left a question as a comment so that we can discuss it either in the PR or we can chat.