Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-569611 Make channel close future idempotent #162

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) {
void closeAllChannels() {
this.cache
.values()
.forEach(channels -> channels.values().forEach(channel -> channel.markClosed()));
.forEach(
channels ->
channels.values().forEach(SnowflakeStreamingIngestChannelInternal::markClosed));
}

/** Remove a channel in the channel cache if the channel sequencer matches */
Expand All @@ -65,7 +67,7 @@ void removeChannelIfSequencersMatch(SnowflakeStreamingIngestChannelInternal chan
// We need to compare the channel sequencer in case the old channel was already been
// removed
return channelInCache != null
&& channelInCache.getChannelSequencer() == channel.getChannelSequencer()
&& channelInCache.getChannelSequencer().equals(channel.getChannelSequencer())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change out of curiosity? These should be long

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use Long and i believe we discussed this long time ago, basically we want to avoid the case when the server side sends us a bad request without the channel sequencer and we translate it into 0 if using long instead of NULL

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and the reason is that == on objects checks if the object is the same while equals() (if correctly implemented) checks if the value is the same, irrespective of the object that caries it. So this could lead to bugs.

&& v.remove(channel.getName()) != null
&& v.isEmpty()
? null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import net.snowflake.ingest.streaming.InsertValidationResponse;
import net.snowflake.ingest.streaming.OpenChannelRequest;
Expand Down Expand Up @@ -45,7 +46,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingInges
private volatile boolean isValid;

// Indicates whether the channel is closed
private volatile boolean isClosed;
private final AtomicReference<Boolean> isClosed;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the difference between AtomicReference and volatile in this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we only use the functionality that it allows to atomically set the value of the ref and get the previous value. This allows us to be sure that do the state transitioning at most once and we perform the actions that should follow the state transition at most once.

Another nice thing about AtomicReference is that it allows you to do an atomic compare-and-swap. So this is useful for state machine implementation.

Before, we had:

 if (isClosed()) {
      return CompletableFuture.completedFuture(null);
 }
 markClosed();

which could have led to two (or more) threads calling close() and both of them seeing isClosed() == false due to timing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't AtomicReference<Boolean> same as AtomicBoolean?


// Reference to the client that owns this channel
private final SnowflakeStreamingIngestClientInternal owningClient;
Expand All @@ -65,6 +66,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingInges
// ON_ERROR option for this channel
private final OpenChannelRequest.OnErrorOption onErrorOption;

private final CompletableFuture<Void> terminationFuture;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, add one-liner explaining what this is.


/**
* Constructor for TESTING ONLY which allows us to set the test mode
*
Expand Down Expand Up @@ -99,7 +102,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingInges
this.channelSequencer = channelSequencer;
this.rowSequencer = new AtomicLong(rowSequencer);
this.isValid = true;
this.isClosed = false;
this.isClosed = new AtomicReference<>(false);
this.owningClient = client;
this.isTestMode = isTestMode;
this.allocator =
Expand All @@ -112,6 +115,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingInges
this.encryptionKey = encryptionKey;
this.encryptionKeyId = encryptionKeyId;
this.onErrorOption = onErrorOption;
this.terminationFuture = new CompletableFuture<>();

logger.logDebug("Channel={} created for table={}", this.channelName, this.tableName);
}

Expand Down Expand Up @@ -255,20 +260,31 @@ void invalidate() {
rowSequencer);
}

/** @return a boolean to indicate whether the channel is closed or not */
/**
* @return {@code true} if the channel was closed, {@code false} otherwise. A return value of
* {@code true} does not mean that the data is committed. For this, you should call {@link
* #close()} and wait on the returned future.
*/
@Override
public boolean isClosed() {
return this.isClosed;
return this.isClosed.get();
}

/** Mark the channel as closed */
void markClosed() {
this.isClosed = true;
logger.logDebug(
"Channel is closed, name={}, channel sequencer={}, row sequencer={}",
getFullyQualifiedName(),
channelSequencer,
rowSequencer);
/**
* Mark the channel as closed atomically.
*
* @return {@code true} if the channel was already closed, {@code false} otherwise.
*/
boolean markClosed() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, could this be a synchronized method?

final boolean wasAlreadyClosed = this.isClosed.getAndSet(true);
if (!wasAlreadyClosed) {
logger.logDebug(
"Channel is closed, name={}, channel sequencer={}, row sequencer={}",
getFullyQualifiedName(),
channelSequencer,
rowSequencer);
}
return wasAlreadyClosed;
}

/**
Expand All @@ -281,7 +297,9 @@ CompletableFuture<Void> flush(boolean closing) {
// Skip this check for closing because we need to set the channel to closed first and then flush
// in case there is any leftover rows
if (isClosed() && !closing) {
throw new SFException(ErrorCode.CLOSED_CHANNEL);
final CompletableFuture<Void> res = new CompletableFuture<>();
res.completeExceptionally(new SFException(ErrorCode.CLOSED_CHANNEL));
return res;
}

// Simply return if there is no data in the channel, this might not work if we support public
Expand All @@ -294,38 +312,51 @@ CompletableFuture<Void> flush(boolean closing) {
}

/**
* Close the channel (this will flush in-flight buffered data)
* Close the channel and flush in-flight buffered data.
*
* @return future which will be complete when the channel is closed
* @return a {@link CompletableFuture} which will be completed when the channel is closed and the
* data is successfully committed or has failed to be committed for some reason.
*/
@Override
public CompletableFuture<Void> close() {
checkValidation();

if (isClosed()) {
return CompletableFuture.completedFuture(null);
}
if (!markClosed()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the function name might be confusing, might better be markAndGetClosed?

this.owningClient.removeChannelIfSequencersMatch(this);

markClosed();
this.owningClient.removeChannelIfSequencersMatch(this);
return flush(true)
.thenRunAsync(
() -> {
List<SnowflakeStreamingIngestChannelInternal> uncommittedChannels =
this.owningClient.verifyChannelsAreFullyCommitted(
Collections.singletonList(this));

this.arrowBuffer.close();

// Throw an exception if the channel has any uncommitted rows
if (!uncommittedChannels.isEmpty()) {
throw new SFException(
ErrorCode.CHANNEL_WITH_UNCOMMITTED_ROWS,
uncommittedChannels.stream()
.map(SnowflakeStreamingIngestChannelInternal::getFullyQualifiedName)
.collect(Collectors.toList()));
}
});
// here we flush even if removal fails because, for some reason,
// the client sequencers did not match.
// This does not seem like the desirable behavior.
Comment on lines +327 to +329
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand what you mean here, could you elaborate? But I notice a potential wrong result issue from your question.... please take a look if you're interested. PR: https://github.com/snowflakedb/snowflake-ingest-java/pull/163/files

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sfc-gh-tzhang The removeChannelIfSequencersMatch() removes the channel from the cache if 1) it was there in the first place and 2) if the client seq matches. If all those conditions are satisfied, then it makes sense to call flush(), upload and register the created blob. If any of these conditions does not match, and the removal fails (e.g. the client seq is old), then should we flush and upload? or should we just invalidate the channel and move on?

I am not sure if this scenario is realistic but I just wanted to point this out.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it makes sense, the validation check is actually in checkValidation() at the top of this function. If the channel is not in the channel cache, it must be invalid, you could add a logWarn to verify this logic


flush(true)
.thenRunAsync(
() -> {
List<SnowflakeStreamingIngestChannelInternal> uncommittedChannels =
this.owningClient.verifyChannelsAreFullyCommitted(
Collections.singletonList(this));

this.arrowBuffer.close();

// Throw an exception if the channel has any uncommitted rows
if (!uncommittedChannels.isEmpty()) {
throw new SFException(
ErrorCode.CHANNEL_WITH_UNCOMMITTED_ROWS,
uncommittedChannels.stream()
.map(SnowflakeStreamingIngestChannelInternal::getFullyQualifiedName)
.collect(Collectors.toList()));
}
})
.handle(
(ignored, t) -> {
if (t != null) {
terminationFuture.completeExceptionally(t);
} else {
terminationFuture.complete(null);
}
return null;
});
}
return terminationFuture;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ List<BlobMetadata> getRetryBlobs(

/** Close the client, which will flush first and then release all the resources */
@Override
public void close() throws Exception {
public synchronized void close() throws Exception {
if (isClosed) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,8 +583,17 @@ public void testFlush() throws Exception {
channel.close().get();
try {
channel.flush(false).get();
} catch (SFException e) {
Assert.assertEquals(ErrorCode.CLOSED_CHANNEL.getMessageCode(), e.getVendorCode());
} catch (Exception e) {
Throwable t = e;
while (t != null) {
t = e.getCause();
if (t instanceof SFException) {
Assert.assertEquals(
ErrorCode.CLOSED_CHANNEL.getMessageCode(), ((SFException) t).getVendorCode());
return;
}
}
Assert.fail("Wrong exception: " + e.getMessage());
}
}

Expand Down