Skip to content

Commit

Permalink
Issue 3619: Rename session to flow. (pravega#3622)
Browse files Browse the repository at this point in the history
* Rename session reference to flow.

Signed-off-by: Sandeep <[email protected]>
  • Loading branch information
shrids authored and fpj committed Apr 12, 2019
1 parent 938123e commit a64dc1d
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,17 @@ public class ClientConnectionImpl implements ClientConnection {
@Getter
private final String connectionName;
@Getter
private final int session;
private final int flowId;
@VisibleForTesting
@Getter
private final SessionHandler nettyHandler;
private final FlowHandler nettyHandler;
private final AppendBatchSizeTracker batchSizeTracker;
private final AtomicBoolean closed = new AtomicBoolean(false);

public ClientConnectionImpl(String connectionName, int session, AppendBatchSizeTracker batchSizeTracker,
SessionHandler nettyHandler) {
public ClientConnectionImpl(String connectionName, int flowId, AppendBatchSizeTracker batchSizeTracker,
FlowHandler nettyHandler) {
this.connectionName = connectionName;
this.session = session;
this.flowId = flowId;
this.batchSizeTracker = batchSizeTracker;
this.nettyHandler = nettyHandler;
}
Expand Down Expand Up @@ -110,14 +110,14 @@ public void sendAsync(List<Append> appends, CompletedCallback callback) {
@Override
public void close() {
if (!closed.getAndSet(true)) {
nettyHandler.closeSession(this);
nettyHandler.closeFlow(this);
}
}

private void checkClientConnectionClosed() throws ConnectionFailedException {
if (closed.get()) {
log.error("ClientConnection to {} with session id {} is already closed", connectionName, session);
throw new ConnectionFailedException("Client connection already closed for session " + session);
log.error("ClientConnection to {} with flow id {} is already closed", connectionName, flowId);
throw new ConnectionFailedException("Client connection already closed for flow " + flowId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@

/**
* This class represents a Connection that is established with a Segment Store instance and its
* attributes. (e.g: SessionCount, WriterCount)
* attributes. (e.g: FlowCount, WriterCount)
*/
@Data
public class Connection {
private final PravegaNodeUri uri;
private final CompletableFuture<SessionHandler> sessionHandler;
private final CompletableFuture<FlowHandler> flowHandler;

/**
* Returns the number of open sessions on this connection, if the connection has been established.
* Returns the number of open flows on this connection, if the connection has been established.
*/
public Optional<Integer> getSessionCount() {
if (!Futures.isSuccessful(sessionHandler)) {
public Optional<Integer> getFlowCount() {
if (!Futures.isSuccessful(flowHandler)) {
return Optional.empty();
}
return Optional.of(sessionHandler.join().getNumOpenSession());
return Optional.of(flowHandler.join().getOpenFlowCount());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface ConnectionPool extends AutoCloseable {
CompletableFuture<ClientConnection> getClientConnection(Flow flow, PravegaNodeUri uri, ReplyProcessor rp);

/**
* This is used to create a {@link ClientConnection} where sessions are disabled. This implies that only one ClientConnection
* This is used to create a {@link ClientConnection} where flows are disabled. This implies that only one ClientConnection
* can exist on the underlying connection.
*
* @param uri The Pravega Node Uri.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public class ConnectionPoolImpl implements ConnectionPool {
private static final Comparator<Connection> COMPARATOR = new Comparator<Connection>() {
@Override
public int compare(Connection c1, Connection c2) {
int v1 = c1.getSessionCount().orElse(Integer.MAX_VALUE);
int v2 = c2.getSessionCount().orElse(Integer.MAX_VALUE);
int v1 = c1.getFlowCount().orElse(Integer.MAX_VALUE);
int v2 = c2.getFlowCount().orElse(Integer.MAX_VALUE);
return Integer.compare(v1, v2);
}
};
Expand Down Expand Up @@ -105,29 +105,29 @@ public CompletableFuture<ClientConnection> getClientConnection(Flow flow, Praveg
// remove connections for which the underlying network connection is disconnected.
List<Connection> prunedConnectionList = connectionList.stream().filter(connection -> {
// Filter out Connection objects which have been completed exceptionally or have been disconnected.
CompletableFuture<SessionHandler> r = connection.getSessionHandler();
CompletableFuture<FlowHandler> r = connection.getFlowHandler();
return !r.isDone() || (Futures.isSuccessful(r) && r.join().isConnectionEstablished());
}).collect(Collectors.toList());
log.debug("List of connections to {} that can be used: {}", location, prunedConnectionList);

// Choose the connection with the least number of sessions.
// Choose the connection with the least number of flows.
Optional<Connection> suggestedConnection = prunedConnectionList.stream().min(COMPARATOR);

final Connection connection;
if (suggestedConnection.isPresent() && (prunedConnectionList.size() >= clientConfig.getMaxConnectionsPerSegmentStore() || isUnused(suggestedConnection.get()))) {
log.debug("Reusing connection: {}", suggestedConnection.get());
Connection oldConnection = suggestedConnection.get();
prunedConnectionList.remove(oldConnection);
connection = new Connection(oldConnection.getUri(), oldConnection.getSessionHandler());
connection = new Connection(oldConnection.getUri(), oldConnection.getFlowHandler());
} else {
// create a new connection.
log.debug("Creating a new connection to {}", location);
CompletableFuture<SessionHandler> sessionHandlerFuture = establishConnection(location);
connection = new Connection(location, sessionHandlerFuture);
CompletableFuture<FlowHandler> flowHandlerFuture = establishConnection(location);
connection = new Connection(location, flowHandlerFuture);
}
prunedConnectionList.add(connection);
connectionMap.put(location, prunedConnectionList);
return connection.getSessionHandler().thenApply(sessionHandler -> sessionHandler.createSession(flow, rp));
return connection.getFlowHandler().thenApply(flowHandler -> flowHandler.createFlow(flow, rp));
}

@Override
Expand All @@ -137,13 +137,13 @@ public CompletableFuture<ClientConnection> getClientConnection(PravegaNodeUri lo
Exceptions.checkNotClosed(closed.get(), this);

// create a new connection.
CompletableFuture<SessionHandler> sessionHandlerFuture = establishConnection(location);
Connection connection = new Connection(location, sessionHandlerFuture);
return connection.getSessionHandler().thenApply(sessionHandler -> sessionHandler.createConnectionWithSessionDisabled(rp));
CompletableFuture<FlowHandler> flowHandlerFuture = establishConnection(location);
Connection connection = new Connection(location, flowHandlerFuture);
return connection.getFlowHandler().thenApply(flowHandler -> flowHandler.createConnectionWithFlowDisabled(rp));
}

private boolean isUnused(Connection connection) {
return connection.getSessionCount().isPresent() && connection.getSessionCount().get() == 0;
return connection.getFlowCount().isPresent() && connection.getFlowCount().get() == 0;
}

/**
Expand All @@ -155,7 +155,7 @@ public void pruneUnusedConnections() {
for (Iterator<Connection> iterator = connections.iterator(); iterator.hasNext();) {
Connection connection = iterator.next();
if (isUnused(connection)) {
connection.getSessionHandler().join().close();
connection.getFlowHandler().join().close();
iterator.remove();
}
}
Expand All @@ -178,16 +178,16 @@ public List<Channel> getActiveChannels() {
/**
* Establish a new connection to the Pravega Node.
* @param location The Pravega Node Uri
* @return A future, which completes once the connection has been established, returning a SessionHandler that can be used to create
* sessions on the connection.
* @return A future, which completes once the connection has been established, returning a FlowHandler that can be used to create
* flows on the connection.
*/
private CompletableFuture<SessionHandler> establishConnection(PravegaNodeUri location) {
private CompletableFuture<FlowHandler> establishConnection(PravegaNodeUri location) {
final AppendBatchSizeTracker batchSizeTracker = new AppendBatchSizeTrackerImpl();
final SessionHandler handler = new SessionHandler(location.getEndpoint(), batchSizeTracker);
final FlowHandler handler = new FlowHandler(location.getEndpoint(), batchSizeTracker);
final Bootstrap b = getNettyBootstrap().handler(getChannelInitializer(location, batchSizeTracker, handler));

// Initiate Connection.
final CompletableFuture<SessionHandler> connectionComplete = new CompletableFuture<>();
final CompletableFuture<FlowHandler> connectionComplete = new CompletableFuture<>();
try {
b.connect(location.getEndpoint(), location.getPort()).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
Expand All @@ -208,7 +208,7 @@ private CompletableFuture<SessionHandler> establishConnection(PravegaNodeUri loc
final CompletableFuture<Void> channelRegisteredFuture = new CompletableFuture<>(); //to track channel registration.
handler.completeWhenRegistered(channelRegisteredFuture);

return connectionComplete.thenCombine(channelRegisteredFuture, (sessionHandler, v) -> sessionHandler);
return connectionComplete.thenCombine(channelRegisteredFuture, (flowHandler, v) -> flowHandler);
}

/**
Expand All @@ -227,7 +227,7 @@ private Bootstrap getNettyBootstrap() {
*/
private ChannelInitializer<SocketChannel> getChannelInitializer(final PravegaNodeUri location,
final AppendBatchSizeTracker batchSizeTracker,
final SessionHandler handler) {
final FlowHandler handler) {
final SslContext sslCtx = getSslContext();

return new ChannelInitializer<SocketChannel>() {
Expand Down
Loading

0 comments on commit a64dc1d

Please sign in to comment.