Skip to content

Commit

Permalink
Issue 3219: Prevent stale connections for sparse writers. (pravega#3409)
Browse files Browse the repository at this point in the history
* Rename flush command so as to not be part of the wire protocol
* Add an empty write to the flush call inside of handleLogSealed.

Signed-off-by: Tom Kaitchuck <[email protected]>
  • Loading branch information
tkaitchuck authored and shrids committed Mar 28, 2019
1 parent f4ae9cf commit d463831
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.pravega.client.stream.impl.Controller;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.ByteBufferUtils;
import io.pravega.common.util.Retry;
import io.pravega.common.util.Retry.RetryWithBackoff;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
Expand All @@ -27,7 +28,6 @@
import io.pravega.shared.protocol.netty.WireCommands;
import io.pravega.shared.protocol.netty.WireCommands.SegmentIsTruncated;
import io.pravega.shared.protocol.netty.WireCommands.SegmentRead;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -98,7 +98,7 @@ public void segmentIsSealed(WireCommands.SegmentIsSealed segmentIsSealed) {
segmentIsSealed.getOffset(),
true,
true,
ByteBuffer.allocate(0),
ByteBufferUtils.EMPTY,
segmentIsSealed.getRequestId()));
}
}
Expand Down
4 changes: 2 additions & 2 deletions client/src/main/java/io/pravega/client/stream/StreamCut.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
package io.pravega.client.stream;

import io.pravega.client.stream.impl.StreamCutInternal;

import io.pravega.common.util.ByteBufferUtils;
import java.io.Serializable;
import java.nio.ByteBuffer;

Expand All @@ -31,7 +31,7 @@ public interface StreamCut extends Serializable {

@Override
public ByteBuffer toBytes() {
return ByteBuffer.allocate(0);
return ByteBufferUtils.EMPTY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.pravega.client.stream.TxnFailedException;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.ExecutorServiceHelpers;
import io.pravega.common.util.ByteBufferUtils;
import io.pravega.common.util.Retry;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -158,7 +159,16 @@ private void handleLogSealed(Segment segment) {
* inflight that will need to be resent to the new segment when the write lock
* is released. (To preserve order)
*/
flushInternal();
for (SegmentOutputStream writer : selector.getWriters()) {
try {
writer.write(PendingEvent.withoutHeader(null, ByteBufferUtils.EMPTY, null));
writer.flush();
} catch (SegmentSealedException e) {
// Segment sealed exception observed during a flush. Re-run flush on all the
// available writers.
log.info("Flush on segment {} failed due to {}, it will be retried.", writer.getSegmentName(), e.getMessage());
}
}
toSeal = sealedSegmentQueue.poll();
log.info("Sealing another segment {} ", toSeal);
}
Expand Down Expand Up @@ -346,24 +356,20 @@ public void flush() {
synchronized (writeFlushLock) {
boolean success = false;
while (!success) {
success = flushInternal();
}
}
}

private boolean flushInternal() {
boolean success = true;
for (SegmentOutputStream writer : selector.getWriters()) {
try {
writer.flush();
} catch (SegmentSealedException e) {
// Segment sealed exception observed during a flush. Re-run flush on all the
// available writers.
success = false;
log.warn("Flush on segment {} failed due to {}, it will be retried.", writer.getSegmentName(), e.getMessage());
success = true;
for (SegmentOutputStream writer : selector.getWriters()) {
try {
writer.flush();
} catch (SegmentSealedException e) {
// Segment sealed exception observed during a flush. Re-run flush on all the
// available writers.
success = false;
log.warn("Flush on segment {} failed due to {}, it will be retried.", writer.getSegmentName(), e.getMessage());
break;
}
}
}
}
return success;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.client.tables.impl;

import io.pravega.common.util.ByteBufferUtils;
import java.io.Serializable;
import java.nio.ByteBuffer;

Expand All @@ -30,7 +31,7 @@ public long getSegmentVersion() {

@Override
public ByteBuffer toBytes() {
return ByteBuffer.allocate(0);
return ByteBufferUtils.EMPTY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.pravega.client.stream.mock.MockConnectionFactoryImpl;
import io.pravega.client.stream.mock.MockController;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.ByteBufferUtils;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.PravegaNodeUri;
import io.pravega.shared.protocol.netty.ReplyProcessor;
Expand Down Expand Up @@ -57,7 +58,7 @@ public void testRetry() {
connectionFactory.provideConnection(endpoint, c);

WireCommands.SegmentRead segmentRead = new WireCommands.SegmentRead(segment.getScopedName(), 1234, false, false,
ByteBuffer.allocate(0), in.getRequestId());
ByteBufferUtils.EMPTY, in.getRequestId());
Mockito.doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Expand Down Expand Up @@ -123,7 +124,7 @@ public void testRead() throws ConnectionFailedException {
connectionFactory.provideConnection(endpoint, c);

WireCommands.SegmentRead segmentRead = new WireCommands.SegmentRead(segment.getScopedName(), 1234, false, false,
ByteBuffer.allocate(0), in.getRequestId());
ByteBufferUtils.EMPTY, in.getRequestId());
CompletableFuture<SegmentRead> readFuture = in.read(1234, 5678);
AssertExtensions.assertBlocks(() -> readFuture.get(), () -> {
ReplyProcessor processor = connectionFactory.getProcessor(endpoint);
Expand Down Expand Up @@ -168,7 +169,7 @@ public void testSegmentTruncated() throws ConnectionFailedException {

//Ensure that reads at a different offset can still happen on the same instance.
WireCommands.SegmentRead segmentRead = new WireCommands.SegmentRead(segment.getScopedName(), 5656, false, false,
ByteBuffer.allocate(0), in.getRequestId());
ByteBufferUtils.EMPTY, in.getRequestId());
CompletableFuture<SegmentRead> readFuture2 = in.read(5656, 5678);
AssertExtensions.assertBlocks(() -> readFuture2.get(), () -> {
ReplyProcessor processor = connectionFactory.getProcessor(endpoint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.mock.MockConnectionFactoryImpl;
import io.pravega.client.stream.mock.MockController;
import io.pravega.common.util.ByteBufferUtils;
import io.pravega.shared.protocol.netty.ConnectionFailedException;
import io.pravega.shared.protocol.netty.PravegaNodeUri;
import io.pravega.shared.protocol.netty.ReplyProcessor;
Expand Down Expand Up @@ -76,7 +77,7 @@ public void testClose() throws SegmentSealedException {
Segment segment = new Segment("scope", "testWrite", 1);
ConditionalOutputStream cOut = factory.createConditionalOutputStream(segment, "token", EventWriterConfig.builder().build());
cOut.close();
AssertExtensions.assertThrows(IllegalStateException.class, () -> cOut.write(ByteBuffer.allocate(0), 0));
AssertExtensions.assertThrows(IllegalStateException.class, () -> cOut.write(ByteBufferUtils.EMPTY, 0));
}

@Test(timeout = 10000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ public void testOrderer() throws EndOfSegmentException, SegmentTruncatedExceptio

fakeNetwork2.complete(0, new WireCommands.SegmentRead(segment.getScopedName(), 0, false, false, wireData.slice(), requestId));
fakeNetwork3.complete(0, new WireCommands.SegmentRead(segment.getScopedName(), 0, false, true, wireData.slice(), requestId));
fakeNetwork4.complete(0, new WireCommands.SegmentRead(segment.getScopedName(), 0, false, true, ByteBuffer.allocate(0), requestId));
fakeNetwork4.complete(0, new WireCommands.SegmentRead(segment.getScopedName(), 0, false, true, ByteBufferUtils.EMPTY, requestId));
fakeNetwork5.completeExceptionally(0, new SegmentTruncatedException());

Orderer o = new Orderer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.client.stream.impl;

import io.pravega.common.util.ByteBufferUtils;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.junit.Test;
Expand Down Expand Up @@ -43,7 +44,7 @@ public void testByteBufferSerializer() {
ByteBuffer subBuffer = serializer.serialize(serialized);
assertEquals(2, serializer.deserialize(subBuffer).capacity());

ByteBuffer empty = ByteBuffer.allocate(0);
ByteBuffer empty = ByteBufferUtils.EMPTY;
serialized = serializer.serialize(empty);
assertEquals(empty, serializer.deserialize(serialized));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ public void testEndOfSegment() {
writer.writeEvent(routingKey, "Bar");
Mockito.verify(controller, Mockito.times(1)).getCurrentSegments(any(), any());

assertEquals(1, outputStream2.acked.size());
assertEquals(2, outputStream2.acked.size());
assertEquals(1, outputStream2.unacked.size());
assertEquals("Foo", serializer.deserialize(outputStream2.getAcked(0)));
assertEquals("Bar", serializer.deserialize(outputStream2.getUnacked(0)));
Expand Down Expand Up @@ -347,7 +347,7 @@ public void testEndOfSegmentBackgroundRefresh() {
outputStream1.invokeSealedCallBack(); // simulate a segment sealed callback.
writer.writeEvent(routingKey, "TestData");
//This time the actual handleLogSealed is invoked and the resend method resends data to outputStream2.
assertEquals(2, outputStream2.acked.size());
assertEquals(3, outputStream2.acked.size());
assertEquals("Foo", serializer.deserialize(outputStream2.getAcked(0)));
assertEquals("Bar", serializer.deserialize(outputStream2.getAcked(1)));
assertEquals(1, outputStream2.unacked.size());
Expand Down Expand Up @@ -490,7 +490,7 @@ public void testSealInvokesFlush() {
assertEquals(0, outputStream1.acked.size());
outputStream1.invokeSealedCallBack();
assertEquals(0, outputStream2.unacked.size());
assertEquals(1, outputStream2.acked.size());
assertEquals(2, outputStream2.acked.size());
}

@Test
Expand Down Expand Up @@ -761,9 +761,9 @@ public void testSegmentSealedInSegmentSealed() {
Mockito.verify(controller, Mockito.times(1)).getCurrentSegments(any(), any());

assertEquals(0, outputStream2.acked.size());
assertEquals(1, outputStream2.unacked.size());
assertEquals(2, outputStream2.unacked.size());
assertEquals("Foo", serializer.deserialize(outputStream2.getUnacked(0)));
assertEquals(1, outputStream3.acked.size());
assertEquals(3, outputStream3.acked.size());
assertEquals(1, outputStream3.unacked.size());
assertEquals("Foo", serializer.deserialize(outputStream3.getAcked(0)));
assertEquals("Bar", serializer.deserialize(outputStream3.getUnacked(0)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.nio.ByteBuffer;

public class ByteBufferUtils {

public static final ByteBuffer EMPTY = ByteBuffer.allocate(0);

public static ByteBuffer slice(ByteBuffer orig, int begin, int length) {
int pos = orig.position();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.netty.handler.codec.MessageToByteEncoder;
import io.pravega.shared.protocol.netty.WireCommands.AppendBlock;
import io.pravega.shared.protocol.netty.WireCommands.AppendBlockEnd;
import io.pravega.shared.protocol.netty.WireCommands.Flush;
import io.pravega.shared.protocol.netty.WireCommands.Padding;
import io.pravega.shared.protocol.netty.WireCommands.PartialEvent;
import io.pravega.shared.protocol.netty.WireCommands.SetupAppend;
Expand Down Expand Up @@ -94,7 +93,7 @@ protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws
segmentBeingAppendedTo = append.segment;
writeMessage(new AppendBlock(append.getRequestId(), session.id), out);
if (ctx != null) {
ctx.executor().schedule(new Flusher(ctx.channel(), currentBlockSize),
ctx.executor().schedule(new BlockTimeouter(ctx.channel(), currentBlockSize),
blockSizeSupplier.getBatchTimeout(),
TimeUnit.MILLISECONDS);
}
Expand Down Expand Up @@ -127,9 +126,9 @@ protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws
writeMessage((SetupAppend) msg, out);
SetupAppend setup = (SetupAppend) msg;
setupSegments.put(setup.getSegment(), new Session(setup.getWriterId()));
} else if (msg instanceof Flush) {
Flush flush = (Flush) msg;
if (currentBlockSize == flush.getBlockSize()) {
} else if (msg instanceof BlockTimeout) {
BlockTimeout timeoutMsg = (BlockTimeout) msg;
if (currentBlockSize == timeoutMsg.ifStillBlockSize) {
breakFromAppend(out);
}
} else if (msg instanceof WireCommand) {
Expand Down Expand Up @@ -201,13 +200,18 @@ private int writeMessage(WireCommand msg, ByteBuf out) {
}

@RequiredArgsConstructor
private static class Flusher implements Runnable {
private static final class BlockTimeout {
private final int ifStillBlockSize;
}

@RequiredArgsConstructor
private static final class BlockTimeouter implements Runnable {
private final Channel channel;
private final int blockSize;

@Override
public void run() {
channel.writeAndFlush(new Flush(blockSize));
channel.writeAndFlush(new BlockTimeout(blockSize));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1512,17 +1512,6 @@ public long getRequestId() {
}
}

@Data
public static final class Flush implements WireCommand {
final WireCommandType type = WireCommandType.KEEP_ALIVE;
private final int blockSize;

@Override
public void writeFields(DataOutput out) {
throw new IllegalStateException("This command is not sent over the wire.");
}
}

@Data
public static final class AuthTokenCheckFailed implements Reply, WireCommand {
final WireCommandType type = WireCommandType.AUTH_TOKEN_CHECK_FAILED;
Expand Down

0 comments on commit d463831

Please sign in to comment.