diff --git a/client/src/main/java/io/pravega/client/netty/impl/ClientConnectionImpl.java b/client/src/main/java/io/pravega/client/netty/impl/ClientConnectionImpl.java index c52b98e1d2b..ae04e72750c 100644 --- a/client/src/main/java/io/pravega/client/netty/impl/ClientConnectionImpl.java +++ b/client/src/main/java/io/pravega/client/netty/impl/ClientConnectionImpl.java @@ -30,17 +30,17 @@ public class ClientConnectionImpl implements ClientConnection { @Getter private final String connectionName; @Getter - private final int session; + private final int flowId; @VisibleForTesting @Getter - private final SessionHandler nettyHandler; + private final FlowHandler nettyHandler; private final AppendBatchSizeTracker batchSizeTracker; private final AtomicBoolean closed = new AtomicBoolean(false); - public ClientConnectionImpl(String connectionName, int session, AppendBatchSizeTracker batchSizeTracker, - SessionHandler nettyHandler) { + public ClientConnectionImpl(String connectionName, int flowId, AppendBatchSizeTracker batchSizeTracker, + FlowHandler nettyHandler) { this.connectionName = connectionName; - this.session = session; + this.flowId = flowId; this.batchSizeTracker = batchSizeTracker; this.nettyHandler = nettyHandler; } @@ -110,14 +110,14 @@ public void sendAsync(List appends, CompletedCallback callback) { @Override public void close() { if (!closed.getAndSet(true)) { - nettyHandler.closeSession(this); + nettyHandler.closeFlow(this); } } private void checkClientConnectionClosed() throws ConnectionFailedException { if (closed.get()) { - log.error("ClientConnection to {} with session id {} is already closed", connectionName, session); - throw new ConnectionFailedException("Client connection already closed for session " + session); + log.error("ClientConnection to {} with flow id {} is already closed", connectionName, flowId); + throw new ConnectionFailedException("Client connection already closed for flow " + flowId); } } diff --git a/client/src/main/java/io/pravega/client/netty/impl/Connection.java b/client/src/main/java/io/pravega/client/netty/impl/Connection.java index 82aee000b6b..12fd639e486 100644 --- a/client/src/main/java/io/pravega/client/netty/impl/Connection.java +++ b/client/src/main/java/io/pravega/client/netty/impl/Connection.java @@ -17,21 +17,21 @@ /** * This class represents a Connection that is established with a Segment Store instance and its - * attributes. (e.g: SessionCount, WriterCount) + * attributes. (e.g: FlowCount, WriterCount) */ @Data public class Connection { private final PravegaNodeUri uri; - private final CompletableFuture sessionHandler; + private final CompletableFuture flowHandler; /** - * Returns the number of open sessions on this connection, if the connection has been established. + * Returns the number of open flows on this connection, if the connection has been established. */ - public Optional getSessionCount() { - if (!Futures.isSuccessful(sessionHandler)) { + public Optional getFlowCount() { + if (!Futures.isSuccessful(flowHandler)) { return Optional.empty(); } - return Optional.of(sessionHandler.join().getNumOpenSession()); + return Optional.of(flowHandler.join().getOpenFlowCount()); } } diff --git a/client/src/main/java/io/pravega/client/netty/impl/ConnectionPool.java b/client/src/main/java/io/pravega/client/netty/impl/ConnectionPool.java index ebcec2582c3..8480974a344 100644 --- a/client/src/main/java/io/pravega/client/netty/impl/ConnectionPool.java +++ b/client/src/main/java/io/pravega/client/netty/impl/ConnectionPool.java @@ -30,7 +30,7 @@ public interface ConnectionPool extends AutoCloseable { CompletableFuture getClientConnection(Flow flow, PravegaNodeUri uri, ReplyProcessor rp); /** - * This is used to create a {@link ClientConnection} where sessions are disabled. This implies that only one ClientConnection + * This is used to create a {@link ClientConnection} where flows are disabled. This implies that only one ClientConnection * can exist on the underlying connection. * * @param uri The Pravega Node Uri. diff --git a/client/src/main/java/io/pravega/client/netty/impl/ConnectionPoolImpl.java b/client/src/main/java/io/pravega/client/netty/impl/ConnectionPoolImpl.java index 501d702b76a..3bdb76a33e7 100644 --- a/client/src/main/java/io/pravega/client/netty/impl/ConnectionPoolImpl.java +++ b/client/src/main/java/io/pravega/client/netty/impl/ConnectionPoolImpl.java @@ -72,8 +72,8 @@ public class ConnectionPoolImpl implements ConnectionPool { private static final Comparator COMPARATOR = new Comparator() { @Override public int compare(Connection c1, Connection c2) { - int v1 = c1.getSessionCount().orElse(Integer.MAX_VALUE); - int v2 = c2.getSessionCount().orElse(Integer.MAX_VALUE); + int v1 = c1.getFlowCount().orElse(Integer.MAX_VALUE); + int v2 = c2.getFlowCount().orElse(Integer.MAX_VALUE); return Integer.compare(v1, v2); } }; @@ -105,12 +105,12 @@ public CompletableFuture getClientConnection(Flow flow, Praveg // remove connections for which the underlying network connection is disconnected. List prunedConnectionList = connectionList.stream().filter(connection -> { // Filter out Connection objects which have been completed exceptionally or have been disconnected. - CompletableFuture r = connection.getSessionHandler(); + CompletableFuture r = connection.getFlowHandler(); return !r.isDone() || (Futures.isSuccessful(r) && r.join().isConnectionEstablished()); }).collect(Collectors.toList()); log.debug("List of connections to {} that can be used: {}", location, prunedConnectionList); - // Choose the connection with the least number of sessions. + // Choose the connection with the least number of flows. Optional suggestedConnection = prunedConnectionList.stream().min(COMPARATOR); final Connection connection; @@ -118,16 +118,16 @@ public CompletableFuture getClientConnection(Flow flow, Praveg log.debug("Reusing connection: {}", suggestedConnection.get()); Connection oldConnection = suggestedConnection.get(); prunedConnectionList.remove(oldConnection); - connection = new Connection(oldConnection.getUri(), oldConnection.getSessionHandler()); + connection = new Connection(oldConnection.getUri(), oldConnection.getFlowHandler()); } else { // create a new connection. log.debug("Creating a new connection to {}", location); - CompletableFuture sessionHandlerFuture = establishConnection(location); - connection = new Connection(location, sessionHandlerFuture); + CompletableFuture flowHandlerFuture = establishConnection(location); + connection = new Connection(location, flowHandlerFuture); } prunedConnectionList.add(connection); connectionMap.put(location, prunedConnectionList); - return connection.getSessionHandler().thenApply(sessionHandler -> sessionHandler.createSession(flow, rp)); + return connection.getFlowHandler().thenApply(flowHandler -> flowHandler.createFlow(flow, rp)); } @Override @@ -137,13 +137,13 @@ public CompletableFuture getClientConnection(PravegaNodeUri lo Exceptions.checkNotClosed(closed.get(), this); // create a new connection. - CompletableFuture sessionHandlerFuture = establishConnection(location); - Connection connection = new Connection(location, sessionHandlerFuture); - return connection.getSessionHandler().thenApply(sessionHandler -> sessionHandler.createConnectionWithSessionDisabled(rp)); + CompletableFuture flowHandlerFuture = establishConnection(location); + Connection connection = new Connection(location, flowHandlerFuture); + return connection.getFlowHandler().thenApply(flowHandler -> flowHandler.createConnectionWithFlowDisabled(rp)); } private boolean isUnused(Connection connection) { - return connection.getSessionCount().isPresent() && connection.getSessionCount().get() == 0; + return connection.getFlowCount().isPresent() && connection.getFlowCount().get() == 0; } /** @@ -155,7 +155,7 @@ public void pruneUnusedConnections() { for (Iterator iterator = connections.iterator(); iterator.hasNext();) { Connection connection = iterator.next(); if (isUnused(connection)) { - connection.getSessionHandler().join().close(); + connection.getFlowHandler().join().close(); iterator.remove(); } } @@ -178,16 +178,16 @@ public List getActiveChannels() { /** * Establish a new connection to the Pravega Node. * @param location The Pravega Node Uri - * @return A future, which completes once the connection has been established, returning a SessionHandler that can be used to create - * sessions on the connection. + * @return A future, which completes once the connection has been established, returning a FlowHandler that can be used to create + * flows on the connection. */ - private CompletableFuture establishConnection(PravegaNodeUri location) { + private CompletableFuture establishConnection(PravegaNodeUri location) { final AppendBatchSizeTracker batchSizeTracker = new AppendBatchSizeTrackerImpl(); - final SessionHandler handler = new SessionHandler(location.getEndpoint(), batchSizeTracker); + final FlowHandler handler = new FlowHandler(location.getEndpoint(), batchSizeTracker); final Bootstrap b = getNettyBootstrap().handler(getChannelInitializer(location, batchSizeTracker, handler)); // Initiate Connection. - final CompletableFuture connectionComplete = new CompletableFuture<>(); + final CompletableFuture connectionComplete = new CompletableFuture<>(); try { b.connect(location.getEndpoint(), location.getPort()).addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { @@ -208,7 +208,7 @@ private CompletableFuture establishConnection(PravegaNodeUri loc final CompletableFuture channelRegisteredFuture = new CompletableFuture<>(); //to track channel registration. handler.completeWhenRegistered(channelRegisteredFuture); - return connectionComplete.thenCombine(channelRegisteredFuture, (sessionHandler, v) -> sessionHandler); + return connectionComplete.thenCombine(channelRegisteredFuture, (flowHandler, v) -> flowHandler); } /** @@ -227,7 +227,7 @@ private Bootstrap getNettyBootstrap() { */ private ChannelInitializer getChannelInitializer(final PravegaNodeUri location, final AppendBatchSizeTracker batchSizeTracker, - final SessionHandler handler) { + final FlowHandler handler) { final SslContext sslCtx = getSslContext(); return new ChannelInitializer() { diff --git a/client/src/main/java/io/pravega/client/netty/impl/SessionHandler.java b/client/src/main/java/io/pravega/client/netty/impl/FlowHandler.java similarity index 69% rename from client/src/main/java/io/pravega/client/netty/impl/SessionHandler.java rename to client/src/main/java/io/pravega/client/netty/impl/FlowHandler.java index 8c4cba5bc80..90a422c28e4 100644 --- a/client/src/main/java/io/pravega/client/netty/impl/SessionHandler.java +++ b/client/src/main/java/io/pravega/client/netty/impl/FlowHandler.java @@ -34,9 +34,9 @@ import lombok.extern.slf4j.Slf4j; @Slf4j -public class SessionHandler extends ChannelInboundHandlerAdapter implements AutoCloseable { +public class FlowHandler extends ChannelInboundHandlerAdapter implements AutoCloseable { - private static final int SESSION_DISABLED = -1; + private static final int FLOW_DISABLED = -1; private final String connectionName; private final AtomicReference channel = new AtomicReference<>(); private final AtomicReference> keepAliveFuture = new AtomicReference<>(); @@ -46,10 +46,10 @@ public class SessionHandler extends ChannelInboundHandlerAdapter implements Auto private final ReusableFutureLatch registeredFutureLatch = new ReusableFutureLatch<>(); @VisibleForTesting @Getter(AccessLevel.PACKAGE) - private final ConcurrentHashMap sessionIdReplyProcessorMap = new ConcurrentHashMap<>(); - private final AtomicBoolean disableSession = new AtomicBoolean(false); + private final ConcurrentHashMap flowIdReplyProcessorMap = new ConcurrentHashMap<>(); + private final AtomicBoolean disableFlow = new AtomicBoolean(false); - public SessionHandler(String connectionName, AppendBatchSizeTracker batchSizeTracker) { + public FlowHandler(String connectionName, AppendBatchSizeTracker batchSizeTracker) { this.connectionName = connectionName; this.batchSizeTracker = batchSizeTracker; } @@ -60,46 +60,46 @@ public SessionHandler(String connectionName, AppendBatchSizeTracker batchSizeTra * @param rp ReplyProcessor for the specified flow. * @return Client Connection object. */ - public ClientConnection createSession(final Flow flow, final ReplyProcessor rp) { + public ClientConnection createFlow(final Flow flow, final ReplyProcessor rp) { Exceptions.checkNotClosed(closed.get(), this); - Preconditions.checkState(!disableSession.get(), "Ensure sessions are enabled."); - log.info("Creating Flow {} for Endpoint {}. The current Channel is {}.", flow.getFlowId(), connectionName, channel.get()); - if (sessionIdReplyProcessorMap.put(flow.getFlowId(), rp) != null) { - throw new IllegalArgumentException("Multiple sessions cannot be created with the same Flow id " + flow.getFlowId()); + Preconditions.checkState(!disableFlow.get(), "Ensure flows are enabled."); + log.info("Creating Flow {} for endpoint {}. The current Channel is {}.", flow.getFlowId(), connectionName, channel.get()); + if (flowIdReplyProcessorMap.put(flow.getFlowId(), rp) != null) { + throw new IllegalArgumentException("Multiple flows cannot be created with the same Flow id " + flow.getFlowId()); } return new ClientConnectionImpl(connectionName, flow.getFlowId(), batchSizeTracker, this); } /** - * Create a {@link ClientConnection} where sessions are disabled. This implies that there is only one session on the underlying + * Create a {@link ClientConnection} where flows are disabled. This implies that there is only one flow on the underlying * network connection. * @param rp The ReplyProcessor. * @return Client Connection object. */ - public ClientConnection createConnectionWithSessionDisabled(final ReplyProcessor rp) { + public ClientConnection createConnectionWithFlowDisabled(final ReplyProcessor rp) { Exceptions.checkNotClosed(closed.get(), this); - Preconditions.checkState(!disableSession.getAndSet(true), "Sessions are disabled, incorrect usage pattern."); - log.info("Creating a new connection with session disabled for Endpoint {}. The current Channel is {}.", connectionName, channel.get()); - sessionIdReplyProcessorMap.put(SESSION_DISABLED, rp); - return new ClientConnectionImpl(connectionName, SESSION_DISABLED, batchSizeTracker, this); + Preconditions.checkState(!disableFlow.getAndSet(true), "Flows are disabled, incorrect usage pattern."); + log.info("Creating a new connection with flow disabled for endpoint {}. The current Channel is {}.", connectionName, channel.get()); + flowIdReplyProcessorMap.put(FLOW_DISABLED, rp); + return new ClientConnectionImpl(connectionName, FLOW_DISABLED, batchSizeTracker, this); } /** - * Close a session. This is invoked when the ClientConnection is closed. + * Close a flow. This is invoked when the ClientConnection is closed. * @param clientConnection Client Connection. */ - public void closeSession(ClientConnection clientConnection) { + public void closeFlow(ClientConnection clientConnection) { final ClientConnectionImpl clientConnectionImpl = (ClientConnectionImpl) clientConnection; - int session = clientConnectionImpl.getSession(); - log.info("Closing Flow: {} for Endpoint: {}", session, clientConnectionImpl.getConnectionName()); - sessionIdReplyProcessorMap.remove(session); + int flow = clientConnectionImpl.getFlowId(); + log.info("Closing Flow {} for endpoint {}", flow, clientConnectionImpl.getConnectionName()); + flowIdReplyProcessorMap.remove(flow); } /** - * Returns the number of open sessions. + * Returns the number of open flows. */ - public int getNumOpenSession() { - return sessionIdReplyProcessorMap.size(); + public int getOpenFlowCount() { + return flowIdReplyProcessorMap.size(); } /** @@ -145,7 +145,7 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); Channel ch = ctx.channel(); channel.set(ch); - log.info("Connection established with Endpoint: {} on ChannelId: {}.", connectionName, ch); + log.info("Connection established with endpoint {} on ChannelId {}", connectionName, ch); ch.write(new WireCommands.Hello(WireCommands.WIRE_VERSION, WireCommands.OLDEST_COMPATIBLE_VERSION), ch.voidPromise()); registeredFutureLatch.release(null); //release all futures waiting for channel registration to complete. // WireCommands.KeepAlive messages are sent for every network connection to a SegmentStore. @@ -156,7 +156,7 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception { } /** - * Invoke all the {@link ReplyProcessor#connectionDropped()} for all the registered sessions once the + * Invoke all the {@link ReplyProcessor#connectionDropped()} for all the registered flows once the * connection is disconnected. * * @see io.netty.channel.ChannelInboundHandler#channelUnregistered(ChannelHandlerContext) @@ -169,9 +169,9 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { future.cancel(false); } channel.set(null); - sessionIdReplyProcessorMap.forEach((sessionId, rp) -> { + flowIdReplyProcessorMap.forEach((flowId, rp) -> { rp.connectionDropped(); - log.debug("Connection dropped for session id : {}", sessionId); + log.debug("Connection dropped for flow id {}", flowId); }); super.channelUnregistered(ctx); } @@ -179,10 +179,10 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { Reply cmd = (Reply) msg; - log.debug(connectionName + " processing reply: {} with session {}.", cmd, Flow.from(cmd.getRequestId())); + log.debug(connectionName + " processing reply {} with flow {}", cmd, Flow.from(cmd.getRequestId())); if (cmd instanceof WireCommands.Hello) { - sessionIdReplyProcessorMap.forEach((sessionId, rp) -> rp.hello((WireCommands.Hello) cmd)); + flowIdReplyProcessorMap.forEach((flowId, rp) -> rp.hello((WireCommands.Hello) cmd)); return; } @@ -201,9 +201,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - sessionIdReplyProcessorMap.forEach((sessionId, rp) -> { + flowIdReplyProcessorMap.forEach((flowId, rp) -> { rp.processingFailure(new ConnectionFailedException(cause)); - log.debug("Exception observed for session id : {}", sessionId); + log.debug("Exception observed for flow id {}", flowId); }); } @@ -212,10 +212,10 @@ public void close() { if (closed.compareAndSet(false, true)) { Channel ch = channel.get(); if (ch != null) { - log.debug("Closing channel:{} ", ch); - final int openSessionCount = sessionIdReplyProcessorMap.size(); - if (openSessionCount != 0) { - log.warn("{} sessions are not closed", openSessionCount); + log.debug("Closing channel {} ", ch); + final int openFlowCount = flowIdReplyProcessorMap.size(); + if (openFlowCount != 0) { + log.warn("{} flows are not closed", openFlowCount); } ch.close(); } @@ -230,17 +230,17 @@ public void run() { Futures.getAndHandleExceptions(getChannel().writeAndFlush(new WireCommands.KeepAlive()), ConnectionFailedException::new); } } catch (Exception e) { - log.warn("Keep alive failed, killing connection {} due to {} ", connectionName, e.getMessage()); + log.warn("Keep alive failed, killing connection {} due to {}", connectionName, e.getMessage()); close(); } } } private Optional getReplyProcessor(Reply cmd) { - int sessionId = disableSession.get() ? SESSION_DISABLED : Flow.from(cmd.getRequestId()).getFlowId(); - final ReplyProcessor processor = sessionIdReplyProcessorMap.get(sessionId); + int flowId = disableFlow.get() ? FLOW_DISABLED : Flow.from(cmd.getRequestId()).getFlowId(); + final ReplyProcessor processor = flowIdReplyProcessorMap.get(flowId); if (processor == null) { - log.warn("No ReplyProcessor found for the provided sessionId {}. Ignoring response", sessionId); + log.warn("No ReplyProcessor found for the provided flowId {}. Ignoring response", flowId); } return Optional.ofNullable(processor); } diff --git a/client/src/test/java/io/pravega/client/netty/impl/ConnectionPoolingTest.java b/client/src/test/java/io/pravega/client/netty/impl/ConnectionPoolingTest.java index 11c5815103b..c7dd011c890 100644 --- a/client/src/test/java/io/pravega/client/netty/impl/ConnectionPoolingTest.java +++ b/client/src/test/java/io/pravega/client/netty/impl/ConnectionPoolingTest.java @@ -70,10 +70,10 @@ public class ConnectionPoolingTest { private final long offset = 1234L; private final int length = 1024; private final String data = "data"; - private Function readRequestGenerator = session -> - new WireCommands.ReadSegment(seg, offset, length, "", session); - private Function readResponseGenerator = session -> - new WireCommands.SegmentRead(seg, offset, true, false, ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)), session); + private Function readRequestGenerator = id -> + new WireCommands.ReadSegment(seg, offset, length, "", id); + private Function readResponseGenerator = id -> + new WireCommands.SegmentRead(seg, offset, true, false, ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)), id); private class EchoServerHandler extends ChannelInboundHandlerAdapter { diff --git a/client/src/test/java/io/pravega/client/netty/impl/FlowHandlerTest.java b/client/src/test/java/io/pravega/client/netty/impl/FlowHandlerTest.java index 5f4bf3ab58d..ce2117ef18e 100644 --- a/client/src/test/java/io/pravega/client/netty/impl/FlowHandlerTest.java +++ b/client/src/test/java/io/pravega/client/netty/impl/FlowHandlerTest.java @@ -58,7 +58,7 @@ public class FlowHandlerTest { public Timeout globalTimeout = Timeout.seconds(15); private Flow flow; - private SessionHandler sessionHandler; + private FlowHandler flowHandler; private ClientConnection clientConnection; @Mock private ReplyProcessor processor; @@ -93,15 +93,15 @@ public void setUp() throws Exception { when(ch.eventLoop()).thenReturn(loop); when(ch.writeAndFlush(any(Object.class))).thenReturn(completedFuture); - sessionHandler = new SessionHandler("testConnection", tracker); - clientConnection = sessionHandler.createSession(flow, processor); + flowHandler = new FlowHandler("testConnection", tracker); + clientConnection = flowHandler.createFlow(flow, processor); } @Test public void sendNormal() throws Exception { // channelRegistered is invoked before send is invoked. // No exceptions are expected here. - sessionHandler.channelRegistered(ctx); + flowHandler.channelRegistered(ctx); clientConnection.send(appendCmd); } @@ -115,36 +115,36 @@ public void sendError() throws Exception { @Test(expected = ConnectionFailedException.class) public void sendErrorUnRegistered() throws Exception { //any send after channelUnregistered should throw a ConnectionFailedException. - sessionHandler.channelRegistered(ctx); - sessionHandler.channelUnregistered(ctx); + flowHandler.channelRegistered(ctx); + flowHandler.channelUnregistered(ctx); clientConnection.send(appendCmd); } @Test public void completeWhenRegisteredNormal() throws Exception { - sessionHandler.channelRegistered(ctx); + flowHandler.channelRegistered(ctx); CompletableFuture testFuture = new CompletableFuture<>(); - sessionHandler.completeWhenRegistered(testFuture); + flowHandler.completeWhenRegistered(testFuture); Assert.assertTrue(Futures.isSuccessful(testFuture)); } @Test public void completeWhenRegisteredDelayed() throws Exception { CompletableFuture testFuture = new CompletableFuture<>(); - sessionHandler.completeWhenRegistered(testFuture); - sessionHandler.channelRegistered(ctx); + flowHandler.completeWhenRegistered(testFuture); + flowHandler.channelRegistered(ctx); Assert.assertTrue(Futures.isSuccessful(testFuture)); } @Test public void completeWhenRegisteredDelayedMultiple() throws Exception { CompletableFuture testFuture = new CompletableFuture<>(); - sessionHandler.completeWhenRegistered(testFuture); + flowHandler.completeWhenRegistered(testFuture); CompletableFuture testFuture1 = new CompletableFuture<>(); - sessionHandler.completeWhenRegistered(testFuture1); + flowHandler.completeWhenRegistered(testFuture1); - sessionHandler.channelRegistered(ctx); + flowHandler.channelRegistered(ctx); Assert.assertTrue(Futures.isSuccessful(testFuture)); testFuture1.get(); //wait until additional future is complete. @@ -154,49 +154,49 @@ public void completeWhenRegisteredDelayedMultiple() throws Exception { @Test(expected = IllegalArgumentException.class) public void createDuplicateSession() throws Exception { Flow flow = new Flow(11, 0); - ClientConnection connection1 = sessionHandler.createSession(flow, processor); - sessionHandler.channelRegistered(ctx); + ClientConnection connection1 = flowHandler.createFlow(flow, processor); + flowHandler.channelRegistered(ctx); connection1.send(appendCmd); // Creating a flow with the same flow id. - sessionHandler.createSession(flow, processor); + flowHandler.createFlow(flow, processor); } @Test public void testCloseSession() throws Exception { - sessionHandler.channelRegistered(ctx); + flowHandler.channelRegistered(ctx); clientConnection.send(appendCmd); - sessionHandler.closeSession(clientConnection); - assertEquals(0, sessionHandler.getSessionIdReplyProcessorMap().size()); + flowHandler.closeFlow(clientConnection); + assertEquals(0, flowHandler.getFlowIdReplyProcessorMap().size()); } @Test public void testCloseSessionHandler() throws Exception { - sessionHandler.channelRegistered(ctx); + flowHandler.channelRegistered(ctx); WireCommands.GetSegmentAttribute cmd = new WireCommands.GetSegmentAttribute(flow.asLong(), "seg", UUID.randomUUID(), ""); clientConnection.sendAsync(cmd, e -> fail("Exception while invoking sendAsync")); - sessionHandler.close(); + flowHandler.close(); // verify that the Channel.close is invoked. Mockito.verify(ch, times(1)).close(); - assertThrows(ObjectClosedException.class, () -> sessionHandler.createSession(flow, processor)); - assertThrows(ObjectClosedException.class, () -> sessionHandler.createConnectionWithSessionDisabled(processor)); + assertThrows(ObjectClosedException.class, () -> flowHandler.createFlow(flow, processor)); + assertThrows(ObjectClosedException.class, () -> flowHandler.createConnectionWithFlowDisabled(processor)); } @Test public void testCreateConnectionWithSessionDisabled() throws Exception { - sessionHandler = new SessionHandler("testConnection1", tracker); - sessionHandler.channelRegistered(ctx); - ClientConnection connection = sessionHandler.createConnectionWithSessionDisabled(processor); + flowHandler = new FlowHandler("testConnection1", tracker); + flowHandler.channelRegistered(ctx); + ClientConnection connection = flowHandler.createConnectionWithFlowDisabled(processor); connection.send(appendCmd); - assertThrows(IllegalStateException.class, () -> sessionHandler.createSession(flow, processor)); + assertThrows(IllegalStateException.class, () -> flowHandler.createFlow(flow, processor)); } @Test public void testChannelUnregistered() throws Exception { - sessionHandler.channelRegistered(ctx); + flowHandler.channelRegistered(ctx); clientConnection.send(appendCmd); //simulate a connection dropped - sessionHandler.channelUnregistered(ctx); - assertFalse(sessionHandler.isConnectionEstablished()); + flowHandler.channelUnregistered(ctx); + assertFalse(flowHandler.isConnectionEstablished()); assertThrows(ConnectionFailedException.class, () -> clientConnection.send(appendCmd)); WireCommands.GetSegmentAttribute cmd = new WireCommands.GetSegmentAttribute(flow.asLong(), "seg", UUID.randomUUID(), ""); clientConnection.sendAsync(cmd, Assert::assertNotNull); @@ -207,8 +207,8 @@ public void testChannelUnregistered() throws Exception { public void testChannelReadWithHello() throws Exception { WireCommands.Hello helloCmd = new WireCommands.Hello(8, 4); InOrder order = inOrder(processor); - sessionHandler.channelRegistered(ctx); - sessionHandler.channelRead(ctx, helloCmd); + flowHandler.channelRegistered(ctx); + flowHandler.channelRead(ctx, helloCmd); order.verify(processor, times(1)).hello(helloCmd); } @@ -217,8 +217,8 @@ public void testChannelReadWithHello() throws Exception { public void testChannelReadDataAppended() throws Exception { WireCommands.DataAppended dataAppendedCmd = new WireCommands.DataAppended(flow.asLong(), UUID.randomUUID(), 2, 1); InOrder order = inOrder(processor); - sessionHandler.channelRegistered(ctx); - sessionHandler.channelRead(ctx, dataAppendedCmd); + flowHandler.channelRegistered(ctx); + flowHandler.channelRead(ctx, dataAppendedCmd); order.verify(processor, times(1)).process(dataAppendedCmd); } } diff --git a/shared/protocol/src/main/java/io/pravega/shared/protocol/netty/Append.java b/shared/protocol/src/main/java/io/pravega/shared/protocol/netty/Append.java index 8dfe3f94134..b4397444c37 100644 --- a/shared/protocol/src/main/java/io/pravega/shared/protocol/netty/Append.java +++ b/shared/protocol/src/main/java/io/pravega/shared/protocol/netty/Append.java @@ -22,24 +22,24 @@ public class Append implements Request, Comparable { final int eventCount; final ByteBuf data; final Long expectedLength; - final long sessionId; + final long flowId; - public Append(String segment, UUID writerId, long eventNumber, Event event, long sessionId) { - this(segment, writerId, eventNumber, 1, event.getAsByteBuf(), null, sessionId); + public Append(String segment, UUID writerId, long eventNumber, Event event, long flowId) { + this(segment, writerId, eventNumber, 1, event.getAsByteBuf(), null, flowId); } - public Append(String segment, UUID writerId, long eventNumber, Event event, long expectedLength, long sessionId) { - this(segment, writerId, eventNumber, 1, event.getAsByteBuf(), expectedLength, sessionId); + public Append(String segment, UUID writerId, long eventNumber, Event event, long expectedLength, long flowId) { + this(segment, writerId, eventNumber, 1, event.getAsByteBuf(), expectedLength, flowId); } - public Append(String segment, UUID writerId, long eventNumber, int eventCount, ByteBuf data, Long expectedLength, long sessionId) { + public Append(String segment, UUID writerId, long eventNumber, int eventCount, ByteBuf data, Long expectedLength, long flowId) { this.segment = segment; this.writerId = writerId; this.eventNumber = eventNumber; this.eventCount = eventCount; this.data = data; this.expectedLength = expectedLength; - this.sessionId = sessionId; + this.flowId = flowId; } public int getDataLength() { @@ -62,6 +62,6 @@ public int compareTo(Append other) { @Override public long getRequestId() { - return sessionId; + return flowId; } }