> getAllManagedConnections();
+
+ /**
+ * check connection, the address format example - 127.0.0.1:12200?key1=value1&key2=value2
+ *
+ * @param address target address
+ * @return true if and only if there is a connection, and the connection is active and writable;else return false
+ */
+ boolean checkConnection(String address);
+
+ /**
+ * Close all connections of a address
+ *
+ * @param address target address
+ */
+ void closeConnection(String address);
+
+ /**
+ * Close all connections of a {@link Url}
+ *
+ * @param url target url
+ */
+ void closeConnection(Url url);
+
+ /**
+ * Enable heart beat for a certain connection.
+ * If this address not connected, then do nothing.
+ *
+ * Notice: this method takes no effect on a stand alone connection.
+ *
+ * @param address target address
+ */
+ void enableConnHeartbeat(String address);
+
+ /**
+ * Enable heart beat for a certain connection.
+ * If this {@link Url} not connected, then do nothing.
+ *
+ * Notice: this method takes no effect on a stand alone connection.
+ *
+ * @param url target url
+ */
+ void enableConnHeartbeat(Url url);
+
+ /**
+ * Disable heart beat for a certain connection.
+ * If this addr not connected, then do nothing.
+ *
+ * Notice: this method takes no effect on a stand alone connection.
+ *
+ * @param address target address
+ */
+ void disableConnHeartbeat(String address);
+
+ /**
+ * Disable heart beat for a certain connection.
+ * If this {@link Url} not connected, then do nothing.
+ *
+ * Notice: this method takes no effect on a stand alone connection.
+ *
+ * @param url target url
+ */
+ void disableConnHeartbeat(Url url);
+
+ /**
+ * enable connection reconnect switch on
+ *
+ * Notice: This api should be called before {@link RpcClient#init()}
+ */
+ void enableReconnectSwitch();
+
+ /**
+ * disable connection reconnect switch off
+ *
+ * Notice: This api should be called before {@link RpcClient#init()}
+ */
+ void disableReconnectSwith();
+
+ /**
+ * is reconnect switch on
+ */
+ boolean isReconnectSwitchOn();
+
+ /**
+ * enable connection monitor switch on
+ */
+ void enableConnectionMonitorSwitch();
+
+ /**
+ * disable connection monitor switch off
+ *
+ * Notice: This api should be called before {@link RpcClient#init()}
+ */
+ void disableConnectionMonitorSwitch();
+
+ /**
+ * is connection monitor switch on
+ */
+ boolean isConnectionMonitorSwitchOn();
+
+ /**
+ * Getter method for property connectionManager.
+ *
+ * @return property value of connectionManager
+ */
+ DefaultConnectionManager getConnectionManager();
+
+ /**
+ * Getter method for property addressParser.
+ *
+ * @return property value of addressParser
+ */
+ RemotingAddressParser getAddressParser();
+
+ /**
+ * Setter method for property addressParser.
+ *
+ * @param addressParser value to be assigned to property addressParser
+ */
+ void setAddressParser(RemotingAddressParser addressParser);
+
+ /**
+ * Setter method for property monitorStrategy.
+ *
+ * @param monitorStrategy value to be assigned to property monitorStrategy
+ */
+ void setMonitorStrategy(ConnectionMonitorStrategy monitorStrategy);
+}
diff --git a/src/main/java/com/alipay/remoting/ClientConnectionManager.java b/src/main/java/com/alipay/remoting/ClientConnectionManager.java
new file mode 100644
index 00000000..aa79f0ed
--- /dev/null
+++ b/src/main/java/com/alipay/remoting/ClientConnectionManager.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.remoting;
+
+/**
+ * Connection manager in client side.
+ *
+ * @author chengyi (mark.lx@antfin.com) 2019-03-07 12:12
+ */
+public interface ClientConnectionManager extends ConnectionManager, LifeCycle {
+
+}
diff --git a/src/main/java/com/alipay/remoting/Connection.java b/src/main/java/com/alipay/remoting/Connection.java
index 0624b446..ecaf96a5 100644
--- a/src/main/java/com/alipay/remoting/Connection.java
+++ b/src/main/java/com/alipay/remoting/Connection.java
@@ -95,7 +95,7 @@ public class Connection {
/**
* Constructor
*
- * @param channel
+ * @param channel associated channel
*/
public Connection(Channel channel) {
this.channel = channel;
@@ -105,8 +105,8 @@ public Connection(Channel channel) {
/**
* Constructor
*
- * @param channel
- * @param url
+ * @param channel associated channel
+ * @param url associated url
*/
public Connection(Channel channel, Url url) {
this(channel);
@@ -117,9 +117,9 @@ public Connection(Channel channel, Url url) {
/**
* Constructor
*
- * @param channel
- * @param protocolCode
- * @param url
+ * @param channel associated channel
+ * @param protocolCode ProtocolCode
+ * @param url associated url
*/
public Connection(Channel channel, ProtocolCode protocolCode, Url url) {
this(channel, url);
@@ -128,11 +128,11 @@ public Connection(Channel channel, ProtocolCode protocolCode, Url url) {
}
/**
- * Constructor
*
- * @param channel
- * @param protocolCode
- * @param url
+ * @param channel associated channel
+ * @param protocolCode ProtocolCode
+ * @param version protocol version
+ * @param url associated url
*/
public Connection(Channel channel, ProtocolCode protocolCode, byte version, Url url) {
this(channel, url);
@@ -145,7 +145,7 @@ public Connection(Channel channel, ProtocolCode protocolCode, byte version, Url
* Initialization.
*/
private void init() {
- this.channel.attr(HEARTBEAT_COUNT).set(new Integer(0));
+ this.channel.attr(HEARTBEAT_COUNT).set(0);
this.channel.attr(PROTOCOL).set(this.protocolCode);
this.channel.attr(VERSION).set(this.version);
this.channel.attr(HEARTBEAT_SWITCH).set(true);
@@ -154,7 +154,7 @@ private void init() {
/**
* to check whether the connection is fine to use
*
- * @return
+ * @return true if connection is fine
*/
public boolean isFine() {
return this.channel != null && this.channel.isActive();
@@ -177,7 +177,7 @@ public void decreaseRef() {
/**
* to check whether the reference count is 0
*
- * @return
+ * @return true if the reference count is 0
*/
public boolean noRef() {
return this.referenceCount.get() == NO_REFERENCE;
@@ -186,7 +186,7 @@ public boolean noRef() {
/**
* Get the address of the remote peer.
*
- * @return
+ * @return remote address
*/
public InetSocketAddress getRemoteAddress() {
return (InetSocketAddress) this.channel.remoteAddress();
@@ -195,7 +195,7 @@ public InetSocketAddress getRemoteAddress() {
/**
* Get the remote IP.
*
- * @return
+ * @return remote IP
*/
public String getRemoteIP() {
return RemotingUtil.parseRemoteIP(this.channel);
@@ -204,7 +204,7 @@ public String getRemoteIP() {
/**
* Get the remote port.
*
- * @return
+ * @return remote port
*/
public int getRemotePort() {
return RemotingUtil.parseRemotePort(this.channel);
@@ -213,7 +213,7 @@ public int getRemotePort() {
/**
* Get the address of the local peer.
*
- * @return
+ * @return local address
*/
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) this.channel.localAddress();
@@ -222,7 +222,7 @@ public InetSocketAddress getLocalAddress() {
/**
* Get the local IP.
*
- * @return
+ * @return local IP
*/
public String getLocalIP() {
return RemotingUtil.parseLocalIP(this.channel);
@@ -231,7 +231,7 @@ public String getLocalIP() {
/**
* Get the local port.
*
- * @return
+ * @return local port
*/
public int getLocalPort() {
return RemotingUtil.parseLocalPort(this.channel);
@@ -240,7 +240,7 @@ public int getLocalPort() {
/**
* Get the netty channel of the connection.
*
- * @return
+ * @return Channel
*/
public Channel getChannel() {
return this.channel;
@@ -249,8 +249,8 @@ public Channel getChannel() {
/**
* Get the InvokeFuture with invokeId of id.
*
- * @param id
- * @return
+ * @param id invoke id
+ * @return InvokeFuture
*/
public InvokeFuture getInvokeFuture(int id) {
return this.invokeFutureMap.get(id);
@@ -259,8 +259,8 @@ public InvokeFuture getInvokeFuture(int id) {
/**
* Add an InvokeFuture
*
- * @param future
- * @return
+ * @param future InvokeFuture
+ * @return previous InvokeFuture with same invoke id
*/
public InvokeFuture addInvokeFuture(InvokeFuture future) {
return this.invokeFutureMap.putIfAbsent(future.invokeId(), future);
@@ -269,8 +269,8 @@ public InvokeFuture addInvokeFuture(InvokeFuture future) {
/**
* Remove InvokeFuture who's invokeId is id
*
- * @param id
- * @return
+ * @param id invoke id
+ * @return associated InvokerFuture with the target id
*/
public InvokeFuture removeInvokeFuture(int id) {
return this.invokeFutureMap.remove(id);
@@ -333,7 +333,7 @@ public boolean isInvokeFutureMapFinish() {
/**
* add a pool key to list
*
- * @param poolKey
+ * @param poolKey connection pool key
*/
public void addPoolKey(String poolKey) {
poolKeys.add(poolKey);
@@ -349,7 +349,7 @@ public Set getPoolKeys() {
/**
* remove pool key
*
- * @param poolKey
+ * @param poolKey connection pool key
*/
public void removePoolKey(String poolKey) {
poolKeys.remove(poolKey);
@@ -367,8 +367,8 @@ public Url getUrl() {
/**
* add Id to group Mapping
*
- * @param id
- * @param poolKey
+ * @param id invoke id
+ * @param poolKey connection pool key
*/
public void addIdPoolKeyMapping(Integer id, String poolKey) {
this.id2PoolKey.put(id, poolKey);
@@ -377,8 +377,8 @@ public void addIdPoolKeyMapping(Integer id, String poolKey) {
/**
* remove id to group Mapping
*
- * @param id
- * @return
+ * @param id connection pool key
+ * @return connection pool key
*/
public String removeIdPoolKeyMapping(Integer id) {
return this.id2PoolKey.remove(id);
@@ -387,8 +387,8 @@ public String removeIdPoolKeyMapping(Integer id) {
/**
* Set attribute key=value.
*
- * @param key
- * @param value
+ * @param key attribute key
+ * @param value attribute value
*/
public void setAttribute(String key, Object value) {
attributes.put(key, value);
@@ -397,9 +397,9 @@ public void setAttribute(String key, Object value) {
/**
* set attribute if key absent.
*
- * @param key
- * @param value
- * @return
+ * @param key attribute key
+ * @param value attribute value
+ * @return previous value
*/
public Object setAttributeIfAbsent(String key, Object value) {
return attributes.putIfAbsent(key, value);
@@ -408,7 +408,7 @@ public Object setAttributeIfAbsent(String key, Object value) {
/**
* Remove attribute.
*
- * @param key
+ * @param key attribute key
*/
public void removeAttribute(String key) {
attributes.remove(key);
@@ -417,8 +417,8 @@ public void removeAttribute(String key) {
/**
* Get attribute.
*
- * @param key
- * @return
+ * @param key attribute key
+ * @return attribute value
*/
public Object getAttribute(String key) {
return attributes.get(key);
diff --git a/src/main/java/com/alipay/remoting/ConnectionEventHandler.java b/src/main/java/com/alipay/remoting/ConnectionEventHandler.java
index 8a18d5f6..d1990be0 100644
--- a/src/main/java/com/alipay/remoting/ConnectionEventHandler.java
+++ b/src/main/java/com/alipay/remoting/ConnectionEventHandler.java
@@ -52,7 +52,7 @@ public class ConnectionEventHandler extends ChannelDuplexHandler {
private ConnectionEventExecutor eventExecutor;
- private ReconnectManager reconnectManager;
+ private Reconnector reconnectManager;
private GlobalSwitch globalSwitch;
@@ -144,7 +144,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
&& this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
Connection conn = (Connection) attr.get();
if (reconnectManager != null) {
- reconnectManager.addReconnectTask(conn.getUrl());
+ reconnectManager.reconnect(conn.getUrl());
}
}
// trigger close connection event
@@ -168,7 +168,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws E
}
break;
default:
- return;
+ break;
}
} else {
super.userEventTriggered(ctx, event);
@@ -186,12 +186,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
ctx.channel().close();
}
- /**
- *
- * @param conn
- * @param remoteAddress
- * @param type
- */
private void onEvent(final Connection conn, final String remoteAddress,
final ConnectionEventType type) {
if (this.eventListener != null) {
@@ -246,14 +240,18 @@ public void setConnectionManager(ConnectionManager connectionManager) {
}
/**
- * Setter method for property reconnectManager.
- *
+ * please use {@link ConnectionEventHandler#setReconnector(Reconnector)} instead
* @param reconnectManager value to be assigned to property reconnectManager
*/
+ @Deprecated
public void setReconnectManager(ReconnectManager reconnectManager) {
this.reconnectManager = reconnectManager;
}
+ public void setReconnector(Reconnector reconnector) {
+ this.reconnectManager = reconnector;
+ }
+
/**
* Dispatch connection event.
*
@@ -269,22 +267,17 @@ public class ConnectionEventExecutor {
/**
* Process event.
*
- * @param event
+ * @param runnable Runnable
*/
- public void onEvent(Runnable event) {
+ public void onEvent(Runnable runnable) {
try {
- executor.execute(event);
+ executor.execute(runnable);
} catch (Throwable t) {
logger.error("Exception caught when execute connection event!", t);
}
}
}
- /**
- * print info log
- * @param format
- * @param addr
- */
private void infoLog(String format, String addr) {
if (logger.isInfoEnabled()) {
if (StringUtils.isNotEmpty(addr)) {
diff --git a/src/main/java/com/alipay/remoting/ConnectionEventListener.java b/src/main/java/com/alipay/remoting/ConnectionEventListener.java
index 2f5d37be..f91ca1ad 100644
--- a/src/main/java/com/alipay/remoting/ConnectionEventListener.java
+++ b/src/main/java/com/alipay/remoting/ConnectionEventListener.java
@@ -33,15 +33,15 @@ public class ConnectionEventListener {
/**
* Dispatch events.
*
- * @param type
- * @param remoteAddr
- * @param conn
+ * @param type ConnectionEventType
+ * @param remoteAddress remoting address
+ * @param connection Connection
*/
- public void onEvent(ConnectionEventType type, String remoteAddr, Connection conn) {
+ public void onEvent(ConnectionEventType type, String remoteAddress, Connection connection) {
List processorList = this.processors.get(type);
if (processorList != null) {
for (ConnectionEventProcessor processor : processorList) {
- processor.onEvent(remoteAddr, conn);
+ processor.onEvent(remoteAddress, connection);
}
}
}
@@ -49,8 +49,8 @@ public void onEvent(ConnectionEventType type, String remoteAddr, Connection conn
/**
* Add event processor.
*
- * @param type
- * @param processor
+ * @param type ConnectionEventType
+ * @param processor ConnectionEventProcessor
*/
public void addConnectionEventProcessor(ConnectionEventType type,
ConnectionEventProcessor processor) {
diff --git a/src/main/java/com/alipay/remoting/ConnectionEventProcessor.java b/src/main/java/com/alipay/remoting/ConnectionEventProcessor.java
index 995f73dd..099ca23c 100644
--- a/src/main/java/com/alipay/remoting/ConnectionEventProcessor.java
+++ b/src/main/java/com/alipay/remoting/ConnectionEventProcessor.java
@@ -25,8 +25,8 @@ public interface ConnectionEventProcessor {
/**
* Process event.
*
- * @param remoteAddr
- * @param conn
+ * @param remoteAddress remoting connection
+ * @param connection Connection
*/
- void onEvent(String remoteAddr, Connection conn);
+ void onEvent(String remoteAddress, Connection connection);
}
diff --git a/src/main/java/com/alipay/remoting/ConnectionHeartbeatManager.java b/src/main/java/com/alipay/remoting/ConnectionHeartbeatManager.java
index 65344f48..02632561 100644
--- a/src/main/java/com/alipay/remoting/ConnectionHeartbeatManager.java
+++ b/src/main/java/com/alipay/remoting/ConnectionHeartbeatManager.java
@@ -27,14 +27,14 @@ public interface ConnectionHeartbeatManager {
/**
* disable heart beat for a certain connection
*
- * @param connection
+ * @param connection Connection
*/
void disableHeartbeat(Connection connection);
/**
* enable heart beat for a certain connection
*
- * @param connection
+ * @param connection Connection
*/
void enableHeartbeat(Connection connection);
}
diff --git a/src/main/java/com/alipay/remoting/ConnectionManager.java b/src/main/java/com/alipay/remoting/ConnectionManager.java
index d1b50192..95abc9af 100644
--- a/src/main/java/com/alipay/remoting/ConnectionManager.java
+++ b/src/main/java/com/alipay/remoting/ConnectionManager.java
@@ -27,10 +27,12 @@
* @author xiaomin.cxm
* @version $Id: ConnectionManager.java, v 0.1 Mar 7, 2016 2:42:46 PM xiaomin.cxm Exp $
*/
-public interface ConnectionManager extends Scannable {
+public interface ConnectionManager extends Scannable, LifeCycle {
+
/**
- * init
+ * Deprecated, use startup instead.
*/
+ @Deprecated
void init();
/**
@@ -97,7 +99,9 @@ public interface ConnectionManager extends Scannable {
/**
* Remove and close all connections from all {@link ConnectionPool}.
+ * Deprecated, use shutdown instead.
*/
+ @Deprecated
void removeAll();
/**
diff --git a/src/main/java/com/alipay/remoting/ConnectionMonitorStrategy.java b/src/main/java/com/alipay/remoting/ConnectionMonitorStrategy.java
index c0e82a09..eb082eae 100644
--- a/src/main/java/com/alipay/remoting/ConnectionMonitorStrategy.java
+++ b/src/main/java/com/alipay/remoting/ConnectionMonitorStrategy.java
@@ -32,8 +32,11 @@ public interface ConnectionMonitorStrategy {
/**
* Filter connections to monitor
*
- * @param connections
+ * Deprecated this method, this should be a private method.
+ *
+ * @param connections connections from a connection pool
*/
+ @Deprecated
Map> filter(List connections);
/**
@@ -42,7 +45,7 @@ public interface ConnectionMonitorStrategy {
* The previous connections in monitor of this protocol,
* will be dropped by monitor automatically.
*
- * @param connPools
+ * @param connPools connection pools
*/
void monitor(Map> connPools);
}
diff --git a/src/main/java/com/alipay/remoting/ConnectionPool.java b/src/main/java/com/alipay/remoting/ConnectionPool.java
index e6cfc8a3..9e717421 100644
--- a/src/main/java/com/alipay/remoting/ConnectionPool.java
+++ b/src/main/java/com/alipay/remoting/ConnectionPool.java
@@ -31,45 +31,36 @@
* @version $Id: ConnectionPool.java, v 0.1 Mar 8, 2016 11:04:54 AM xiaomin.cxm Exp $
*/
public class ConnectionPool implements Scannable {
- // ~~~ constants
- /** logger */
- private static final Logger logger = BoltLoggerFactory
- .getLogger("CommonDefault");
- /** connections */
- private CopyOnWriteArrayList conns = new CopyOnWriteArrayList();
+ private static final Logger logger = BoltLoggerFactory.getLogger("CommonDefault");
- /** strategy */
+ private CopyOnWriteArrayList connections;
private ConnectionSelectStrategy strategy;
-
- /** timestamp to record the last time this pool be accessed */
private volatile long lastAccessTimestamp;
-
- /** whether async create connection done */
- private volatile boolean asyncCreationDone = true;
+ private volatile boolean asyncCreationDone;
/**
* Constructor
*
- * @param strategy
+ * @param strategy ConnectionSelectStrategy
*/
public ConnectionPool(ConnectionSelectStrategy strategy) {
this.strategy = strategy;
+ this.connections = new CopyOnWriteArrayList();
+ this.asyncCreationDone = true;
}
- // ~~~ members
-
/**
* add a connection
*
- * @param connection
+ * @param connection Connection
*/
public void add(Connection connection) {
markAccess();
if (null == connection) {
return;
}
- boolean res = this.conns.addIfAbsent(connection);
+ boolean res = connections.addIfAbsent(connection);
if (res) {
connection.increaseRef();
}
@@ -78,23 +69,23 @@ public void add(Connection connection) {
/**
* check weather a connection already added
*
- * @param connection
- * @return
+ * @param connection Connection
+ * @return whether this pool contains the target connection
*/
public boolean contains(Connection connection) {
- return this.conns.contains(connection);
+ return connections.contains(connection);
}
/**
* removeAndTryClose a connection
*
- * @param connection
+ * @param connection Connection
*/
public void removeAndTryClose(Connection connection) {
if (null == connection) {
return;
}
- boolean res = this.conns.remove(connection);
+ boolean res = connections.remove(connection);
if (res) {
connection.decreaseRef();
}
@@ -107,23 +98,23 @@ public void removeAndTryClose(Connection connection) {
* remove all connections
*/
public void removeAllAndTryClose() {
- for (Connection conn : this.conns) {
+ for (Connection conn : connections) {
removeAndTryClose(conn);
}
- this.conns.clear();
+ connections.clear();
}
/**
* get a connection
*
- * @return
+ * @return Connection
*/
public Connection get() {
markAccess();
- if (null != this.conns) {
- List snapshot = new ArrayList(this.conns);
+ if (null != connections) {
+ List snapshot = new ArrayList(connections);
if (snapshot.size() > 0) {
- return this.strategy.select(snapshot);
+ return strategy.select(snapshot);
} else {
return null;
}
@@ -135,29 +126,29 @@ public Connection get() {
/**
* get all connections
*
- * @return
+ * @return Connection List
*/
public List getAll() {
markAccess();
- return new ArrayList(this.conns);
+ return new ArrayList(connections);
}
/**
* connection pool size
*
- * @return
+ * @return pool size
*/
public int size() {
- return this.conns.size();
+ return connections.size();
}
/**
* is connection pool empty
*
- * @return
+ * @return true if this connection pool has no connection
*/
public boolean isEmpty() {
- return this.conns.isEmpty();
+ return connections.isEmpty();
}
/**
@@ -166,51 +157,48 @@ public boolean isEmpty() {
* @return property value of lastAccessTimestamp
*/
public long getLastAccessTimestamp() {
- return this.lastAccessTimestamp;
+ return lastAccessTimestamp;
}
/**
* do mark the time stamp when access this pool
*/
private void markAccess() {
- this.lastAccessTimestamp = System.currentTimeMillis();
+ lastAccessTimestamp = System.currentTimeMillis();
}
/**
* is async create connection done
- * @return
+ * @return true if async create connection done
*/
public boolean isAsyncCreationDone() {
- return this.asyncCreationDone;
+ return asyncCreationDone;
}
/**
* do mark async create connection done
*/
public void markAsyncCreationDone() {
- this.asyncCreationDone = true;
+ asyncCreationDone = true;
}
/**
* do mark async create connection start
*/
public void markAsyncCreationStart() {
- this.asyncCreationDone = false;
+ asyncCreationDone = false;
}
- /**
- * @see com.alipay.remoting.Scannable#scan()
- */
@Override
public void scan() {
- if (null != this.conns && !this.conns.isEmpty()) {
- for (Connection conn : conns) {
+ if (null != connections && !connections.isEmpty()) {
+ for (Connection conn : connections) {
if (!conn.isFine()) {
logger.warn(
"Remove bad connection when scanning conns of ConnectionPool - {}:{}",
conn.getRemoteIP(), conn.getRemotePort());
conn.close();
- this.removeAndTryClose(conn);
+ removeAndTryClose(conn);
}
}
}
diff --git a/src/main/java/com/alipay/remoting/ConnectionSelectStrategy.java b/src/main/java/com/alipay/remoting/ConnectionSelectStrategy.java
index 05d87521..cf2e3f77 100644
--- a/src/main/java/com/alipay/remoting/ConnectionSelectStrategy.java
+++ b/src/main/java/com/alipay/remoting/ConnectionSelectStrategy.java
@@ -28,8 +28,8 @@ public interface ConnectionSelectStrategy {
/**
* select strategy
*
- * @param conns
- * @return
+ * @param connections source connections
+ * @return selected connection
*/
- Connection select(List conns);
+ Connection select(List connections);
}
\ No newline at end of file
diff --git a/src/main/java/com/alipay/remoting/DefaultClientConnectionManager.java b/src/main/java/com/alipay/remoting/DefaultClientConnectionManager.java
new file mode 100644
index 00000000..6f067ba0
--- /dev/null
+++ b/src/main/java/com/alipay/remoting/DefaultClientConnectionManager.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.remoting;
+
+import com.alipay.remoting.config.switches.GlobalSwitch;
+import com.alipay.remoting.connection.ConnectionFactory;
+
+/**
+ * Do some preparatory work in order to refactor the ConnectionManager in the next version.
+ *
+ * @author chengyi (mark.lx@antfin.com) 2019-03-07 14:27
+ */
+public class DefaultClientConnectionManager extends DefaultConnectionManager implements
+ ClientConnectionManager {
+
+ public DefaultClientConnectionManager(ConnectionSelectStrategy connectionSelectStrategy,
+ ConnectionFactory connectionFactory,
+ ConnectionEventHandler connectionEventHandler,
+ ConnectionEventListener connectionEventListener) {
+ super(connectionSelectStrategy, connectionFactory, connectionEventHandler,
+ connectionEventListener);
+ }
+
+ public DefaultClientConnectionManager(ConnectionSelectStrategy connectionSelectStrategy,
+ ConnectionFactory connectionFactory,
+ ConnectionEventHandler connectionEventHandler,
+ ConnectionEventListener connectionEventListener,
+ GlobalSwitch globalSwitch) {
+ super(connectionSelectStrategy, connectionFactory, connectionEventHandler,
+ connectionEventListener, globalSwitch);
+ }
+
+ @Override
+ public void startup() throws LifeCycleException {
+ super.startup();
+
+ this.connectionEventHandler.setConnectionManager(this);
+ this.connectionEventHandler.setConnectionEventListener(connectionEventListener);
+ this.connectionFactory.init(connectionEventHandler);
+ }
+
+}
diff --git a/src/main/java/com/alipay/remoting/DefaultConnectionManager.java b/src/main/java/com/alipay/remoting/DefaultConnectionManager.java
index 60520e05..c8d68d8c 100644
--- a/src/main/java/com/alipay/remoting/DefaultConnectionManager.java
+++ b/src/main/java/com/alipay/remoting/DefaultConnectionManager.java
@@ -26,12 +26,12 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import com.alipay.remoting.constant.Constants;
import com.alipay.remoting.log.BoltLoggerFactory;
import org.slf4j.Logger;
@@ -49,62 +49,17 @@
* @author xiaomin.cxm
* @version $Id: DefaultConnectionManager.java, v 0.1 Mar 8, 2016 10:43:51 AM xiaomin.cxm Exp $
*/
-public class DefaultConnectionManager implements ConnectionManager, ConnectionHeartbeatManager,
- Scannable {
+public class DefaultConnectionManager extends AbstractLifeCycle implements ConnectionManager,
+ ConnectionHeartbeatManager,
+ Scannable, LifeCycle {
- // ~~~ constants
- /**
- * logger
- */
- private static final Logger logger = BoltLoggerFactory
- .getLogger("CommonDefault");
-
- /**
- * default expire time to remove connection pool, time unit: milliseconds
- */
- private static final int DEFAULT_EXPIRE_TIME = 10 * 60 * 1000;
-
- /**
- * default retry times when falied to get result of FutureTask
- */
- private static final int DEFAULT_RETRY_TIMES = 2;
-
- // ~~~ members
-
- /**
- * min pool size for asyncCreateConnectionExecutor
- */
- private int minPoolSize = ConfigManager
- .conn_create_tp_min_size();
-
- /**
- * max pool size for asyncCreateConnectionExecutor
- */
- private int maxPoolSize = ConfigManager
- .conn_create_tp_max_size();
-
- /**
- * queue size for asyncCreateConnectionExecutor
- */
- private int queueSize = ConfigManager
- .conn_create_tp_queue_size();
-
- /**
- * keep alive time for asyncCreateConnectionExecutor
- */
- private long keepAliveTime = ConfigManager
- .conn_create_tp_keepalive();
-
- /**
- * executor initialie status
- */
- private volatile boolean executorInitialized;
+ private static final Logger logger = BoltLoggerFactory
+ .getLogger("CommonDefault");
/**
* executor to create connections in async way
- * note: this is lazy initialized
*/
- private Executor asyncCreateConnectionExecutor;
+ private ThreadPoolExecutor asyncCreateConnectionExecutor;
/**
* switch status
@@ -146,10 +101,8 @@ public class DefaultConnectionManager implements ConnectionManager, ConnectionHe
*/
protected ConnectionEventListener connectionEventListener;
- // ~~~ constructors
-
/**
- * Default constructor
+ * Default constructor.
*/
public DefaultConnectionManager() {
this.connTasks = new ConcurrentHashMap>();
@@ -158,7 +111,9 @@ public DefaultConnectionManager() {
}
/**
- * @param connectionSelectStrategy
+ * Construct with parameters.
+ *
+ * @param connectionSelectStrategy connection selection strategy
*/
public DefaultConnectionManager(ConnectionSelectStrategy connectionSelectStrategy) {
this();
@@ -166,8 +121,10 @@ public DefaultConnectionManager(ConnectionSelectStrategy connectionSelectStrateg
}
/**
- * @param connectionSelectStrategy
- * @param connectionFactory
+ * Construct with parameters.
+ *
+ * @param connectionSelectStrategy connection selection strategy
+ * @param connectionFactory connection factory
*/
public DefaultConnectionManager(ConnectionSelectStrategy connectionSelectStrategy,
ConnectionFactory connectionFactory) {
@@ -176,23 +133,26 @@ public DefaultConnectionManager(ConnectionSelectStrategy connectionSelectStrateg
}
/**
- * @param connectionFactory
- * @param addressParser
- * @param connectionEventHandler
+ * Construct with parameters.
+ * @param connectionFactory connection selection strategy
+ * @param addressParser address parser
+ * @param connectionEventHandler connection event handler
*/
public DefaultConnectionManager(ConnectionFactory connectionFactory,
RemotingAddressParser addressParser,
ConnectionEventHandler connectionEventHandler) {
- this(new RandomSelectStrategy(), connectionFactory);
+ this(new RandomSelectStrategy(null), connectionFactory);
this.addressParser = addressParser;
this.connectionEventHandler = connectionEventHandler;
}
/**
- * @param connectionSelectStrategy
- * @param connectionFactory
- * @param connectionEventHandler
- * @param connectionEventListener
+ * Construct with parameters.
+ *
+ * @param connectionSelectStrategy connection selection strategy
+ * @param connectionFactory connection factory
+ * @param connectionEventHandler connection event handler
+ * @param connectionEventListener connection event listener
*/
public DefaultConnectionManager(ConnectionSelectStrategy connectionSelectStrategy,
ConnectionFactory connectionFactory,
@@ -204,27 +164,58 @@ public DefaultConnectionManager(ConnectionSelectStrategy connectionSelectStrateg
}
/**
- * @param connectionSelectStrategy
- * @param connctionFactory
- * @param connectionEventHandler
- * @param connectionEventListener
- * @param globalSwitch
+ * Construct with parameters.
+ *
+ * @param connectionSelectStrategy connection selection strategy.
+ * @param connectionFactory connection factory
+ * @param connectionEventHandler connection event handler
+ * @param connectionEventListener connection event listener
+ * @param globalSwitch global switch
*/
public DefaultConnectionManager(ConnectionSelectStrategy connectionSelectStrategy,
- ConnectionFactory connctionFactory,
+ ConnectionFactory connectionFactory,
ConnectionEventHandler connectionEventHandler,
ConnectionEventListener connectionEventListener,
GlobalSwitch globalSwitch) {
- this(connectionSelectStrategy, connctionFactory, connectionEventHandler,
+ this(connectionSelectStrategy, connectionFactory, connectionEventHandler,
connectionEventListener);
this.globalSwitch = globalSwitch;
}
- // ~~~ interface methods
+ @Override
+ public void startup() throws LifeCycleException {
+ super.startup();
+
+ long keepAliveTime = ConfigManager.conn_create_tp_keepalive();
+ int queueSize = ConfigManager.conn_create_tp_queue_size();
+ int minPoolSize = ConfigManager.conn_create_tp_min_size();
+ int maxPoolSize = ConfigManager.conn_create_tp_max_size();
+ this.asyncCreateConnectionExecutor = new ThreadPoolExecutor(minPoolSize, maxPoolSize,
+ keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize),
+ new NamedThreadFactory("Bolt-conn-warmup-executor", true));
+ }
- /**
- * @see com.alipay.remoting.ConnectionManager#init()
- */
+ @Override
+ public void shutdown() throws LifeCycleException {
+ super.shutdown();
+
+ if (asyncCreateConnectionExecutor != null) {
+ asyncCreateConnectionExecutor.shutdown();
+ }
+
+ if (null == this.connTasks || this.connTasks.isEmpty()) {
+ return;
+ }
+ Iterator iter = this.connTasks.keySet().iterator();
+ while (iter.hasNext()) {
+ String poolKey = iter.next();
+ this.removeTask(poolKey);
+ iter.remove();
+ }
+ logger.warn("All connection pool and connections have been removed!");
+ }
+
+ @Deprecated
@Override
public void init() {
this.connectionEventHandler.setConnectionManager(this);
@@ -373,23 +364,20 @@ public void remove(String poolKey) {
/**
* Warning! This is weakly consistent implementation, to prevent lock the whole {@link ConcurrentHashMap}.
- *
- * @see ConnectionManager#removeAll()
*/
+ @Deprecated
@Override
public void removeAll() {
if (null == this.connTasks || this.connTasks.isEmpty()) {
return;
}
- if (null != this.connTasks && !this.connTasks.isEmpty()) {
- Iterator iter = this.connTasks.keySet().iterator();
- while (iter.hasNext()) {
- String poolKey = iter.next();
- this.removeTask(poolKey);
- iter.remove();
- }
- logger.warn("All connection pool and connections have been removed!");
+ Iterator iter = this.connTasks.keySet().iterator();
+ while (iter.hasNext()) {
+ String poolKey = iter.next();
+ this.removeTask(poolKey);
+ iter.remove();
}
+ logger.warn("All connection pool and connections have been removed!");
}
/**
@@ -430,8 +418,6 @@ public int count(String poolKey) {
/**
* in case of cache pollution and connection leak, to do schedule scan
- *
- * @see com.alipay.remoting.Scannable#scan()
*/
@Override
public void scan() {
@@ -443,7 +429,7 @@ public void scan() {
if (null != pool) {
pool.scan();
if (pool.isEmpty()) {
- if ((System.currentTimeMillis() - pool.getLastAccessTimestamp()) > DEFAULT_EXPIRE_TIME) {
+ if ((System.currentTimeMillis() - pool.getLastAccessTimestamp()) > Constants.DEFAULT_EXPIRE_TIME) {
iter.remove();
logger.warn("Remove expired pool task of poolKey {} which is empty.",
poolKey);
@@ -476,8 +462,7 @@ public Connection getAndCreateIfAbsent(Url url) throws InterruptedException, Rem
* If no task cached, create one and initialize the connections.
* If task cached, check whether the number of connections adequate, if not then heal it.
*
- * @param url
- * @return
+ * @param url target url
* @throws InterruptedException
* @throws RemotingException
*/
@@ -499,7 +484,7 @@ public void createConnectionAndHealIfNeed(Url url) throws InterruptedException,
*/
@Override
public Connection create(Url url) throws RemotingException {
- Connection conn = null;
+ Connection conn;
try {
conn = this.connectionFactory.createConnection(url);
} catch (Exception e) {
@@ -514,14 +499,12 @@ public Connection create(Url url) throws RemotingException {
*/
@Override
public Connection create(String ip, int port, int connectTimeout) throws RemotingException {
- Connection conn = null;
try {
- conn = this.connectionFactory.createConnection(ip, port, connectTimeout);
+ return this.connectionFactory.createConnection(ip, port, connectTimeout);
} catch (Exception e) {
throw new RemotingException("Create connection failed. The address is " + ip + ":"
+ port, e);
}
- return conn;
}
/**
@@ -554,13 +537,11 @@ public void enableHeartbeat(Connection connection) {
}
}
- // ~~~ private methods
-
/**
* get connection pool from future task
*
- * @param task
- * @return
+ * @param task future task
+ * @return connection pool
*/
private ConnectionPool getConnectionPool(RunStateRecordedFutureTask task) {
return FutureTaskUtil.getFutureTaskResult(task, logger);
@@ -581,10 +562,10 @@ private ConnectionPool getConnectionPoolAndCreateIfAbsent(String poolKey,
Callable callable)
throws RemotingException,
InterruptedException {
- RunStateRecordedFutureTask initialTask = null;
+ RunStateRecordedFutureTask initialTask;
ConnectionPool pool = null;
- int retry = DEFAULT_RETRY_TIMES;
+ int retry = Constants.DEFAULT_RETRY_TIMES;
int timesOfResultNull = 0;
int timesOfInterrupt = 0;
@@ -592,10 +573,11 @@ private ConnectionPool getConnectionPoolAndCreateIfAbsent(String poolKey,
for (int i = 0; (i < retry) && (pool == null); ++i) {
initialTask = this.connTasks.get(poolKey);
if (null == initialTask) {
- initialTask = new RunStateRecordedFutureTask(callable);
- initialTask = this.connTasks.putIfAbsent(poolKey, initialTask);
+ RunStateRecordedFutureTask newTask = new RunStateRecordedFutureTask(
+ callable);
+ initialTask = this.connTasks.putIfAbsent(poolKey, newTask);
if (null == initialTask) {
- initialTask = this.connTasks.get(poolKey);
+ initialTask = newTask;
initialTask.run();
}
}
@@ -641,9 +623,9 @@ private ConnectionPool getConnectionPoolAndCreateIfAbsent(String poolKey,
/**
* remove task and remove all connections
*
- * @param poolKey
+ * @param poolKey target pool key
*/
- private void removeTask(String poolKey) {
+ protected void removeTask(String poolKey) {
RunStateRecordedFutureTask task = this.connTasks.remove(poolKey);
if (null != task) {
ConnectionPool pool = FutureTaskUtil.getFutureTaskResult(task, logger);
@@ -656,8 +638,8 @@ private void removeTask(String poolKey) {
/**
* execute heal connection tasks if the actual number of connections in pool is less than expected
*
- * @param pool
- * @param url
+ * @param pool connection pool
+ * @param url target url
*/
private void healIfNeed(ConnectionPool pool, Url url) throws RemotingException,
InterruptedException {
@@ -667,10 +649,11 @@ private void healIfNeed(ConnectionPool pool, Url url) throws RemotingException,
if (pool.isAsyncCreationDone() && pool.size() < url.getConnNum()) {
FutureTask task = this.healTasks.get(poolKey);
if (null == task) {
- task = new FutureTask(new HealConnectionCall(url, pool));
- task = this.healTasks.putIfAbsent(poolKey, task);
+ FutureTask newTask = new FutureTask(new HealConnectionCall(url,
+ pool));
+ task = this.healTasks.putIfAbsent(poolKey, newTask);
if (null == task) {
- task = this.healTasks.get(poolKey);
+ task = newTask;
task.run();
}
}
@@ -717,7 +700,7 @@ public ConnectionPoolCall() {
/**
* create a {@link ConnectionPool} and init connections with the specified {@link Url}
*
- * @param url
+ * @param url target url
*/
public ConnectionPoolCall(Url url) {
this.whetherInitConnection = true;
@@ -753,7 +736,7 @@ private class HealConnectionCall implements Callable {
/**
* create a {@link ConnectionPool} and init connections with the specified {@link Url}
*
- * @param url
+ * @param url target url
*/
public HealConnectionCall(Url url, ConnectionPool pool) {
this.url = url;
@@ -770,9 +753,9 @@ public Integer call() throws Exception {
/**
* do create connections
*
- * @param url
- * @param pool
- * @param taskName
+ * @param url target url
+ * @param pool connection pool
+ * @param taskName task name
* @param syncCreateNumWhenNotWarmup you can specify this param to ensure at least desired number of connections available in sync way
* @throws RemotingException
*/
@@ -780,78 +763,64 @@ private void doCreate(final Url url, final ConnectionPool pool, final String tas
final int syncCreateNumWhenNotWarmup) throws RemotingException {
final int actualNum = pool.size();
final int expectNum = url.getConnNum();
- if (actualNum < expectNum) {
- if (logger.isDebugEnabled()) {
- logger.debug("actual num {}, expect num {}, task name {}", actualNum, expectNum,
- taskName);
+ if (actualNum >= expectNum) {
+ return;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("actual num {}, expect num {}, task name {}", actualNum, expectNum,
+ taskName);
+ }
+ if (url.isConnWarmup()) {
+ for (int i = actualNum; i < expectNum; ++i) {
+ Connection connection = create(url);
+ pool.add(connection);
+ }
+ } else {
+ if (syncCreateNumWhenNotWarmup < 0 || syncCreateNumWhenNotWarmup > url.getConnNum()) {
+ throw new IllegalArgumentException(
+ "sync create number when not warmup should be [0," + url.getConnNum() + "]");
}
- if (url.isConnWarmup()) {
- for (int i = actualNum; i < expectNum; ++i) {
+ // create connection in sync way
+ if (syncCreateNumWhenNotWarmup > 0) {
+ for (int i = 0; i < syncCreateNumWhenNotWarmup; ++i) {
Connection connection = create(url);
pool.add(connection);
}
- } else {
- if (syncCreateNumWhenNotWarmup < 0 || syncCreateNumWhenNotWarmup > url.getConnNum()) {
- throw new IllegalArgumentException(
- "sync create number when not warmup should be [0," + url.getConnNum() + "]");
+ if (syncCreateNumWhenNotWarmup >= url.getConnNum()) {
+ return;
}
- // create connection in sync way
- if (syncCreateNumWhenNotWarmup > 0) {
- for (int i = 0; i < syncCreateNumWhenNotWarmup; ++i) {
- Connection connection = create(url);
- pool.add(connection);
- }
- if (syncCreateNumWhenNotWarmup == url.getConnNum()) {
- return;
- }
- }
- // initialize executor in lazy way
- initializeExecutor();
- pool.markAsyncCreationStart();// mark the start of async
- try {
- this.asyncCreateConnectionExecutor.execute(new Runnable() {
- @Override
- public void run() {
- try {
- for (int i = pool.size(); i < url.getConnNum(); ++i) {
- Connection conn = null;
- try {
- conn = create(url);
- } catch (RemotingException e) {
- logger
- .error(
- "Exception occurred in async create connection thread for {}, taskName {}",
- url.getUniqueKey(), taskName, e);
- }
- pool.add(conn);
+ }
+
+ pool.markAsyncCreationStart();// mark the start of async
+ try {
+ this.asyncCreateConnectionExecutor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ for (int i = pool.size(); i < url.getConnNum(); ++i) {
+ Connection conn = null;
+ try {
+ conn = create(url);
+ } catch (RemotingException e) {
+ logger
+ .error(
+ "Exception occurred in async create connection thread for {}, taskName {}",
+ url.getUniqueKey(), taskName, e);
}
- } finally {
- pool.markAsyncCreationDone();// mark the end of async
+ pool.add(conn);
}
+ } finally {
+ pool.markAsyncCreationDone();// mark the end of async
}
- });
- } catch (RejectedExecutionException e) {
- pool.markAsyncCreationDone();// mark the end of async when reject
- throw e;
- }
- } // end of NOT warm up
- } // end of if
- }
-
- /**
- * initialize executor
- */
- private void initializeExecutor() {
- if (!this.executorInitialized) {
- this.executorInitialized = true;
- this.asyncCreateConnectionExecutor = new ThreadPoolExecutor(minPoolSize, maxPoolSize,
- keepAliveTime, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize),
- new NamedThreadFactory("Bolt-conn-warmup-executor", true));
- }
+ }
+ });
+ } catch (RejectedExecutionException e) {
+ pool.markAsyncCreationDone();// mark the end of async when reject
+ throw e;
+ }
+ } // end of NOT warm up
}
- // ~~~ getters and setters
-
/**
* Getter method for property connectionSelectStrategy.
*
diff --git a/src/main/java/com/alipay/remoting/DefaultConnectionMonitor.java b/src/main/java/com/alipay/remoting/DefaultConnectionMonitor.java
index bf0b2223..05545a72 100644
--- a/src/main/java/com/alipay/remoting/DefaultConnectionMonitor.java
+++ b/src/main/java/com/alipay/remoting/DefaultConnectionMonitor.java
@@ -33,72 +33,78 @@
* @author tsui
* @version $Id: DefaultConnectionMonitor.java, v 0.1 2017-02-21 12:09 tsui Exp $
*/
-public class DefaultConnectionMonitor {
+public class DefaultConnectionMonitor extends AbstractLifeCycle {
- private static final Logger logger = BoltLoggerFactory.getLogger("CommonDefault");
+ private static final Logger logger = BoltLoggerFactory.getLogger("CommonDefault");
- /** Connection pools to monitor */
- private DefaultConnectionManager connectionManager;
+ private final DefaultConnectionManager connectionManager;
+ private final ConnectionMonitorStrategy strategy;
- /** Monitor strategy */
- private ConnectionMonitorStrategy strategy;
-
- private ScheduledThreadPoolExecutor executor;
+ private ScheduledThreadPoolExecutor executor;
public DefaultConnectionMonitor(ConnectionMonitorStrategy strategy,
DefaultConnectionManager connectionManager) {
+ if (strategy == null) {
+ throw new IllegalArgumentException("null strategy");
+ }
+
+ if (connectionManager == null) {
+ throw new IllegalArgumentException("null connectionManager");
+ }
+
this.strategy = strategy;
this.connectionManager = connectionManager;
}
- /**
- * Start schedule task
- *
- */
- public void start() {
- /** initial delay to execute schedule task, unit: ms */
+ @Override
+ public void startup() throws LifeCycleException {
+ super.startup();
+
+ /* initial delay to execute schedule task, unit: ms */
long initialDelay = ConfigManager.conn_monitor_initial_delay();
- /** period of schedule task, unit: ms*/
+ /* period of schedule task, unit: ms*/
long period = ConfigManager.conn_monitor_period();
this.executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(
"ConnectionMonitorThread", true), new ThreadPoolExecutor.AbortPolicy());
- MonitorTask monitorTask = new MonitorTask();
- this.executor.scheduleAtFixedRate(monitorTask, initialDelay, period, TimeUnit.MILLISECONDS);
+ this.executor.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Map> connPools = connectionManager
+ .getConnPools();
+ strategy.monitor(connPools);
+ } catch (Exception e) {
+ logger.warn("MonitorTask error", e);
+ }
+ }
+ }, initialDelay, period, TimeUnit.MILLISECONDS);
}
- /**
- * cancel task and shutdown executor
- *
- * @throws Exception
- */
- public void destroy() {
+ @Override
+ public void shutdown() throws LifeCycleException {
+ super.shutdown();
+
executor.purge();
executor.shutdown();
}
/**
- * Monitor Task
- *
- * @author tsui
- * @version $Id: DefaultConnectionMonitor.java, v 0.1 2017-02-21 12:09 tsui Exp $
+ * Start schedule task
+ * please use {@link DefaultConnectionMonitor#startup()} instead
*/
- private class MonitorTask implements Runnable {
- /**
- * @see java.lang.Runnable#run()
- */
- @Override
- public void run() {
- try {
- if (strategy != null) {
- Map> connPools = connectionManager
- .getConnPools();
- strategy.monitor(connPools);
- }
- } catch (Exception e) {
- logger.warn("MonitorTask error", e);
- }
- }
+ @Deprecated
+ public void start() {
+ startup();
+ }
+
+ /**
+ * cancel task and shutdown executor
+ * please use {@link DefaultConnectionMonitor#shutdown()} instead
+ */
+ @Deprecated
+ public void destroy() {
+ shutdown();
}
}
diff --git a/src/main/java/com/alipay/remoting/DefaultServerConnectionManager.java b/src/main/java/com/alipay/remoting/DefaultServerConnectionManager.java
new file mode 100644
index 00000000..b39ed17e
--- /dev/null
+++ b/src/main/java/com/alipay/remoting/DefaultServerConnectionManager.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.remoting;
+
+/**
+ * Do some preparatory work in order to refactor the ConnectionManager in the next version.
+ *
+ * @author chengyi (mark.lx@antfin.com) 2019-03-07 14:40
+ */
+public class DefaultServerConnectionManager extends DefaultConnectionManager implements
+ ServerConnectionManager {
+
+ public DefaultServerConnectionManager(ConnectionSelectStrategy connectionSelectStrategy) {
+ super(connectionSelectStrategy);
+ }
+
+}
diff --git a/src/main/java/com/alipay/remoting/LifeCycle.java b/src/main/java/com/alipay/remoting/LifeCycle.java
new file mode 100644
index 00000000..4dff275b
--- /dev/null
+++ b/src/main/java/com/alipay/remoting/LifeCycle.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.remoting;
+
+/**
+ * @author chengyi (mark.lx@antfin.com) 2018-11-05 14:27
+ */
+public interface LifeCycle {
+
+ void startup() throws LifeCycleException;
+
+ void shutdown() throws LifeCycleException;
+
+ boolean isStarted();
+}
diff --git a/src/main/java/com/alipay/remoting/LifeCycleException.java b/src/main/java/com/alipay/remoting/LifeCycleException.java
new file mode 100644
index 00000000..e08153b6
--- /dev/null
+++ b/src/main/java/com/alipay/remoting/LifeCycleException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.remoting;
+
+/**
+ * @author chengyi (mark.lx@antfin.com) 2018-11-05 14:42
+ */
+public class LifeCycleException extends RuntimeException {
+
+ private static final long serialVersionUID = -5581833793111988391L;
+
+ public LifeCycleException(String message) {
+ super(message);
+ }
+}
diff --git a/src/main/java/com/alipay/remoting/RandomSelectStrategy.java b/src/main/java/com/alipay/remoting/RandomSelectStrategy.java
index 1f711bd5..98d0ce2a 100644
--- a/src/main/java/com/alipay/remoting/RandomSelectStrategy.java
+++ b/src/main/java/com/alipay/remoting/RandomSelectStrategy.java
@@ -34,55 +34,45 @@
* @version $Id: RandomSelectStrategy.java, v 0.1 Mar 30, 2016 8:38:40 PM yunliang.shi Exp $
*/
public class RandomSelectStrategy implements ConnectionSelectStrategy {
- /** logger */
+
private static final Logger logger = BoltLoggerFactory.getLogger("CommonDefault");
- /** max retry times */
private static final int MAX_TIMES = 5;
-
- /** random */
private final Random random = new Random();
-
- private GlobalSwitch globalSwitch;
-
- public RandomSelectStrategy() {
- }
+ private final GlobalSwitch globalSwitch;
public RandomSelectStrategy(GlobalSwitch globalSwitch) {
this.globalSwitch = globalSwitch;
}
- /**
- * @see com.alipay.remoting.ConnectionSelectStrategy#select(java.util.List)
- */
@Override
- public Connection select(List conns) {
+ public Connection select(List connections) {
try {
- if (conns == null) {
+ if (connections == null) {
return null;
}
- int size = conns.size();
+ int size = connections.size();
if (size == 0) {
return null;
}
- Connection result = null;
+ Connection result;
if (null != this.globalSwitch
&& this.globalSwitch.isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) {
- List serviceStatusOnConns = new ArrayList();
- for (Connection conn : conns) {
+ List serviceStatusOnConnections = new ArrayList();
+ for (Connection conn : connections) {
String serviceStatus = (String) conn.getAttribute(Configs.CONN_SERVICE_STATUS);
if (!StringUtils.equals(serviceStatus, Configs.CONN_SERVICE_STATUS_OFF)) {
- serviceStatusOnConns.add(conn);
+ serviceStatusOnConnections.add(conn);
}
}
- if (serviceStatusOnConns.size() == 0) {
+ if (serviceStatusOnConnections.size() == 0) {
throw new Exception(
"No available connection when select in RandomSelectStrategy.");
}
- result = randomGet(serviceStatusOnConns);
+ result = randomGet(serviceStatusOnConnections);
} else {
- result = randomGet(conns);
+ result = randomGet(connections);
}
return result;
} catch (Throwable e) {
@@ -94,19 +84,19 @@ public Connection select(List conns) {
/**
* get one connection randomly
*
- * @param conns
- * @return
+ * @param connections source connections
+ * @return result connection
*/
- private Connection randomGet(List conns) {
- if (null == conns || conns.isEmpty()) {
+ private Connection randomGet(List connections) {
+ if (null == connections || connections.isEmpty()) {
return null;
}
- int size = conns.size();
+ int size = connections.size();
int tries = 0;
Connection result = null;
while ((result == null || !result.isFine()) && tries++ < MAX_TIMES) {
- result = conns.get(this.random.nextInt(size));
+ result = connections.get(this.random.nextInt(size));
}
if (result != null && !result.isFine()) {
diff --git a/src/main/java/com/alipay/remoting/ReconnectManager.java b/src/main/java/com/alipay/remoting/ReconnectManager.java
index c0253fb4..bb64970e 100644
--- a/src/main/java/com/alipay/remoting/ReconnectManager.java
+++ b/src/main/java/com/alipay/remoting/ReconnectManager.java
@@ -22,7 +22,6 @@
import org.slf4j.Logger;
-import com.alipay.remoting.exception.RemotingException;
import com.alipay.remoting.log.BoltLoggerFactory;
/**
@@ -31,132 +30,147 @@
* @author yunliang.shi
* @version $Id: ReconnectManager.java, v 0.1 Mar 11, 2016 5:20:50 PM yunliang.shi Exp $
*/
-public class ReconnectManager {
- private static final Logger logger = BoltLoggerFactory.getLogger("CommonDefault");
+public class ReconnectManager extends AbstractLifeCycle implements Reconnector {
- class ReconnectTask {
- Url url;
- }
-
- private final LinkedBlockingQueue tasks = new LinkedBlockingQueue();
+ private static final Logger logger = BoltLoggerFactory
+ .getLogger("CommonDefault");
- protected final List canceled = new CopyOnWriteArrayList();
- private volatile boolean started;
+ private static final int HEAL_CONNECTION_INTERVAL = 1000;
- private int healConnectionInterval = 1000;
+ private final ConnectionManager connectionManager;
+ private final LinkedBlockingQueue tasks;
+ private final List canceled;
- private final Thread healConnectionThreads;
-
- private ConnectionManager connectionManager;
+ private Thread healConnectionThreads;
public ReconnectManager(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
- this.healConnectionThreads = new Thread(new HealConnectionRunner());
- this.healConnectionThreads.start();
- this.started = true;
+ this.tasks = new LinkedBlockingQueue();
+ this.canceled = new CopyOnWriteArrayList();
}
- private void doReconnectTask(ReconnectTask task) throws InterruptedException, RemotingException {
- connectionManager.createConnectionAndHealIfNeed(task.url);
+ @Override
+ public void reconnect(Url url) {
+ tasks.add(new ReconnectTask(url));
}
- private void addReconnectTask(ReconnectTask task) {
- tasks.add(task);
- }
-
- public void addCancelUrl(Url url) {
+ @Override
+ public void disableReconnect(Url url) {
canceled.add(url);
}
- public void removeCancelUrl(Url url) {
+ @Override
+ public void enableReconnect(Url url) {
canceled.remove(url);
}
+ @Override
+ public void startup() throws LifeCycleException {
+ super.startup();
+
+ this.healConnectionThreads = new Thread(new HealConnectionRunner());
+ this.healConnectionThreads.start();
+ }
+
+ @Override
+ public void shutdown() throws LifeCycleException {
+ super.shutdown();
+
+ healConnectionThreads.interrupt();
+ this.tasks.clear();
+ this.canceled.clear();
+ }
+
/**
- * add reconnect task
- *
- * @param url
+ * please use {@link Reconnector#disableReconnect(Url)} instead
*/
- public void addReconnectTask(Url url) {
- ReconnectTask task = new ReconnectTask();
- task.url = url;
- tasks.add(task);
+ @Deprecated
+ public void addCancelUrl(Url url) {
+ disableReconnect(url);
}
/**
- * Check task whether is valid, if canceled, is not valid
- *
- * @param task
- * @return
+ * please use {@link Reconnector#enableReconnect(Url)} instead
*/
- private boolean isValidTask(ReconnectTask task) {
- return !canceled.contains(task.url);
+ @Deprecated
+ public void removeCancelUrl(Url url) {
+ enableReconnect(url);
}
/**
- * stop reconnect thread
+ * please use {@link Reconnector#reconnect(Url)} instead
*/
- public void stop() {
- if (!this.started) {
- return;
- }
- this.started = false;
- healConnectionThreads.interrupt();
- this.tasks.clear();
- this.canceled.clear();
+ @Deprecated
+ public void addReconnectTask(Url url) {
+ reconnect(url);
}
/**
- * heal connection thread
- *
- * @author yunliang.shi
- * @version $Id: ReconnectManager.java, v 0.1 Mar 11, 2016 5:24:08 PM yunliang.shi Exp $
+ * please use {@link Reconnector#shutdown()} instead
*/
+ @Deprecated
+ public void stop() {
+ shutdown();
+ }
+
private final class HealConnectionRunner implements Runnable {
private long lastConnectTime = -1;
@Override
public void run() {
- while (ReconnectManager.this.started) {
+ while (isStarted()) {
long start = -1;
ReconnectTask task = null;
try {
- if (this.lastConnectTime > 0
- && this.lastConnectTime < ReconnectManager.this.healConnectionInterval
- || this.lastConnectTime < 0) {
- Thread.sleep(ReconnectManager.this.healConnectionInterval);
+ if (this.lastConnectTime < HEAL_CONNECTION_INTERVAL) {
+ Thread.sleep(HEAL_CONNECTION_INTERVAL);
}
+
try {
task = ReconnectManager.this.tasks.take();
} catch (InterruptedException e) {
// ignore
}
+ if (task == null) {
+ continue;
+ }
+
start = System.currentTimeMillis();
- if (ReconnectManager.this.isValidTask(task)) {
- try {
- ReconnectManager.this.doReconnectTask(task);
- } catch (InterruptedException e) {
- throw e;
- }
+ if (!canceled.contains(task.url)) {
+ task.run();
} else {
logger.warn("Invalid reconnect request task {}, cancel list size {}",
task.url, canceled.size());
}
this.lastConnectTime = System.currentTimeMillis() - start;
} catch (Exception e) {
- retryWhenException(start, task, e);
+ if (start != -1) {
+ this.lastConnectTime = System.currentTimeMillis() - start;
+ }
+
+ if (task != null) {
+ logger.warn("reconnect target: {} failed.", task.url, e);
+ tasks.add(task);
+ }
}
}
}
+ }
- private void retryWhenException(long start, ReconnectTask task, Exception e) {
- if (start != -1) {
- this.lastConnectTime = System.currentTimeMillis() - start;
- }
- if (task != null) {
- logger.warn("reconnect target: {} failed.", task.url, e);
- ReconnectManager.this.addReconnectTask(task);
+ private class ReconnectTask implements Runnable {
+ Url url;
+
+ public ReconnectTask(Url url) {
+ this.url = url;
+ }
+
+ @Override
+ public void run() {
+ try {
+ connectionManager.createConnectionAndHealIfNeed(url);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
}
diff --git a/src/main/java/com/alipay/remoting/Reconnector.java b/src/main/java/com/alipay/remoting/Reconnector.java
new file mode 100644
index 00000000..89341ef0
--- /dev/null
+++ b/src/main/java/com/alipay/remoting/Reconnector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.remoting;
+
+/**
+ * Reconnect manager interface.
+ *
+ * @author chengyi (mark.lx@antfin.com) 2018-11-05 17:43
+ */
+public interface Reconnector extends LifeCycle {
+
+ /**
+ * Do reconnecting in async mode.
+ *
+ * @param url target url
+ */
+ void reconnect(Url url);
+
+ /**
+ * Disable reconnect to the target url.
+ *
+ * @param url target url
+ */
+ void disableReconnect(Url url);
+
+ /**
+ * Enable reconnect to the target url.
+ *
+ * @param url target url
+ */
+ void enableReconnect(Url url);
+}
diff --git a/src/main/java/com/alipay/remoting/RemotingServer.java b/src/main/java/com/alipay/remoting/RemotingServer.java
index b43a9cb2..59e74c6d 100644
--- a/src/main/java/com/alipay/remoting/RemotingServer.java
+++ b/src/main/java/com/alipay/remoting/RemotingServer.java
@@ -18,12 +18,13 @@
import java.util.concurrent.ExecutorService;
+import com.alipay.remoting.config.Configurable;
import com.alipay.remoting.rpc.protocol.UserProcessor;
/**
* @author chengyi (mark.lx@antfin.com) 2018-06-16 06:55
*/
-public interface RemotingServer {
+public interface RemotingServer extends Configurable, LifeCycle {
/**
* init the server
@@ -32,16 +33,18 @@ public interface RemotingServer {
void init();
/**
- * Start the server.
+ * Start the server. Use startup() instead.
*/
+ @Deprecated
boolean start();
/**
- * Stop the server.
+ * Stop the server. Use shutdown() instead.
*
* Remoting server can not be used any more after stop.
* If you need, you should destroy it, and instantiate another one.
*/
+ @Deprecated
boolean stop();
/**
diff --git a/src/main/java/com/alipay/remoting/ScheduledDisconnectStrategy.java b/src/main/java/com/alipay/remoting/ScheduledDisconnectStrategy.java
index dbc58025..b60380f3 100644
--- a/src/main/java/com/alipay/remoting/ScheduledDisconnectStrategy.java
+++ b/src/main/java/com/alipay/remoting/ScheduledDisconnectStrategy.java
@@ -17,7 +17,6 @@
package com.alipay.remoting;
import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -43,26 +42,25 @@
* @version $Id: ScheduledDisconnectStrategy.java, v 0.1 2017-02-21 14:14 tsui Exp $
*/
public class ScheduledDisconnectStrategy implements ConnectionMonitorStrategy {
- private static final Logger logger = BoltLoggerFactory
- .getLogger("CommonDefault");
+ private static final Logger logger = BoltLoggerFactory.getLogger("CommonDefault");
- /** the connections threshold of each {@link Url#uniqueKey} */
- private static final int CONNECTION_THRESHOLD = ConfigManager.conn_threshold();
+ private final int connectionThreshold;
+ private final Random random;
- /** fresh select connections to be closed */
- private Map freshSelectConnections = new ConcurrentHashMap();
-
- /** Retry detect period for ScheduledDisconnectStrategy*/
- private static int RETRY_DETECT_PERIOD = ConfigManager.retry_detect_period();
-
- /** random */
- private Random random = new Random();
+ public ScheduledDisconnectStrategy() {
+ this.connectionThreshold = ConfigManager.conn_threshold();
+ this.random = new Random();
+ }
/**
- * Filter connections to monitor
+ * This method only invoked in ScheduledDisconnectStrategy, so no need to be exposed.
+ * This method will be remove in next version, do not use this method.
*
- * @param connections
+ * The user cannot call ScheduledDisconnectStrategy#filter, so modifying the implementation of this method is safe.
+ *
+ * @param connections connections from a connection pool
*/
+ @Deprecated
@Override
public Map> filter(List connections) {
List serviceOnConnections = new ArrayList();
@@ -70,14 +68,10 @@ public Map> filter(List connections) {
Map> filteredConnections = new ConcurrentHashMap>();
for (Connection connection : connections) {
- String serviceStatus = (String) connection.getAttribute(Configs.CONN_SERVICE_STATUS);
- if (serviceStatus != null) {
- if (connection.isInvokeFutureMapFinish()
- && !freshSelectConnections.containsValue(connection)) {
- serviceOffConnections.add(connection);
- }
- } else {
+ if (isConnectionOn(connection)) {
serviceOnConnections.add(connection);
+ } else {
+ serviceOffConnections.add(connection);
}
}
@@ -86,59 +80,51 @@ public Map> filter(List connections) {
return filteredConnections;
}
- /**
- * Monitor connections and close connections with status is off
- *
- * @param connPools
- */
@Override
public void monitor(Map> connPools) {
try {
- if (null != connPools && !connPools.isEmpty()) {
- Iterator>> iter = connPools
- .entrySet().iterator();
-
- while (iter.hasNext()) {
- Map.Entry> entry = iter
- .next();
- String poolKey = entry.getKey();
- ConnectionPool pool = FutureTaskUtil.getFutureTaskResult(entry.getValue(),
- logger);
-
- List connections = pool.getAll();
- Map> filteredConnectons = this.filter(connections);
- List serviceOnConnections = filteredConnectons
- .get(Configs.CONN_SERVICE_STATUS_ON);
- List serviceOffConnections = filteredConnectons
- .get(Configs.CONN_SERVICE_STATUS_OFF);
- if (serviceOnConnections.size() > CONNECTION_THRESHOLD) {
- Connection freshSelectConnect = serviceOnConnections.get(random
- .nextInt(serviceOnConnections.size()));
- freshSelectConnect.setAttribute(Configs.CONN_SERVICE_STATUS,
- Configs.CONN_SERVICE_STATUS_OFF);
-
- Connection lastSelectConnect = freshSelectConnections.remove(poolKey);
- freshSelectConnections.put(poolKey, freshSelectConnect);
+ if (connPools == null || connPools.size() == 0) {
+ return;
+ }
- closeFreshSelectConnections(lastSelectConnect, serviceOffConnections);
+ for (Map.Entry> entry : connPools
+ .entrySet()) {
+ String poolKey = entry.getKey();
+ ConnectionPool pool = FutureTaskUtil.getFutureTaskResult(entry.getValue(), logger);
+ List serviceOnConnections = new ArrayList();
+ List serviceOffConnections = new ArrayList();
+ for (Connection connection : pool.getAll()) {
+ if (isConnectionOn(connection)) {
+ serviceOnConnections.add(connection);
} else {
- if (freshSelectConnections.containsKey(poolKey)) {
- Connection lastSelectConnect = freshSelectConnections.remove(poolKey);
- closeFreshSelectConnections(lastSelectConnect, serviceOffConnections);
- }
- if (logger.isInfoEnabled()) {
- logger
- .info(
- "the size of serviceOnConnections [{}] reached CONNECTION_THRESHOLD [{}].",
- serviceOnConnections.size(), CONNECTION_THRESHOLD);
- }
+ serviceOffConnections.add(connection);
+ }
+ }
+
+ if (serviceOnConnections.size() > connectionThreshold) {
+ Connection freshSelectConnect = serviceOnConnections.get(random
+ .nextInt(serviceOnConnections.size()));
+ freshSelectConnect.setAttribute(Configs.CONN_SERVICE_STATUS,
+ Configs.CONN_SERVICE_STATUS_OFF);
+ serviceOffConnections.add(freshSelectConnect);
+ } else {
+ if (logger.isInfoEnabled()) {
+ logger.info("serviceOnConnections({}) size[{}], CONNECTION_THRESHOLD[{}].",
+ poolKey, serviceOnConnections.size(), connectionThreshold);
}
+ }
- for (Connection offConn : serviceOffConnections) {
+ for (Connection offConn : serviceOffConnections) {
+ if (offConn.isInvokeFutureMapFinish()) {
if (offConn.isFine()) {
offConn.close();
}
+ } else {
+ if (logger.isInfoEnabled()) {
+ logger.info("Address={} won't close at this schedule turn",
+ RemotingUtil.parseRemoteAddress(offConn.getChannel()));
+ }
}
}
}
@@ -147,30 +133,8 @@ public void monitor(Map> conn
}
}
- /**
- * close the connection of the fresh select connections
- *
- * @param lastSelectConnect
- * @param serviceOffConnections
- * @throws InterruptedException
- */
- private void closeFreshSelectConnections(Connection lastSelectConnect,
- List serviceOffConnections)
- throws InterruptedException {
- if (null != lastSelectConnect) {
- if (lastSelectConnect.isInvokeFutureMapFinish()) {
- serviceOffConnections.add(lastSelectConnect);
- } else {
- Thread.sleep(RETRY_DETECT_PERIOD);
- if (lastSelectConnect.isInvokeFutureMapFinish()) {
- serviceOffConnections.add(lastSelectConnect);
- } else {
- if (logger.isInfoEnabled()) {
- logger.info("Address={} won't close at this schedule turn",
- RemotingUtil.parseRemoteAddress(lastSelectConnect.getChannel()));
- }
- }
- }
- }
+ private boolean isConnectionOn(Connection connection) {
+ String serviceStatus = (String) connection.getAttribute(Configs.CONN_SERVICE_STATUS);
+ return serviceStatus == null || Boolean.parseBoolean(serviceStatus);
}
}
diff --git a/src/main/java/com/alipay/remoting/ServerConnectionManager.java b/src/main/java/com/alipay/remoting/ServerConnectionManager.java
new file mode 100644
index 00000000..c1e489e2
--- /dev/null
+++ b/src/main/java/com/alipay/remoting/ServerConnectionManager.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.remoting;
+
+/**
+ * Connection manager in server side.
+ *
+ * @author chengyi (mark.lx@antfin.com) 2019-03-07 12:13
+ */
+public interface ServerConnectionManager extends ConnectionManager, LifeCycle {
+
+}
diff --git a/src/main/java/com/alipay/remoting/ServerIdleHandler.java b/src/main/java/com/alipay/remoting/ServerIdleHandler.java
index b7f5c0e3..0c7cc74a 100644
--- a/src/main/java/com/alipay/remoting/ServerIdleHandler.java
+++ b/src/main/java/com/alipay/remoting/ServerIdleHandler.java
@@ -40,7 +40,6 @@ public class ServerIdleHandler extends ChannelDuplexHandler {
private static final Logger logger = BoltLoggerFactory.getLogger("CommonDefault");
/**
- *
* @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext, java.lang.Object)
*/
@Override
diff --git a/src/main/java/com/alipay/remoting/Url.java b/src/main/java/com/alipay/remoting/Url.java
index c0cbe798..732dc6ae 100644
--- a/src/main/java/com/alipay/remoting/Url.java
+++ b/src/main/java/com/alipay/remoting/Url.java
@@ -151,7 +151,7 @@ public Url(String originUrl, String ip, int port, String uniqueKey, Properties p
/**
* Get property value according to property key
*
- * @param key
+ * @param key property key
* @return property value
*/
public String getProperty(String key) {
@@ -254,7 +254,8 @@ public int hashCode() {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("Origin url [" + this.originUrl + "], Unique key [" + this.uniqueKey + "].");
+ sb.append("Origin url [").append(this.originUrl).append("], Unique key [")
+ .append(this.uniqueKey).append("].");
return sb.toString();
}
@@ -268,11 +269,8 @@ public String toString() {
private static final Logger logger = BoltLoggerFactory
.getLogger("RpcRemoting");
- /**
- * @throws Throwable
- */
@Override
- protected void finalize() throws Throwable {
+ protected void finalize() {
try {
isCollected = true;
parsedUrls.remove(this.getOriginUrl());
diff --git a/src/main/java/com/alipay/remoting/codec/AbstractBatchDecoder.java b/src/main/java/com/alipay/remoting/codec/AbstractBatchDecoder.java
index a381ee50..ce9f115f 100644
--- a/src/main/java/com/alipay/remoting/codec/AbstractBatchDecoder.java
+++ b/src/main/java/com/alipay/remoting/codec/AbstractBatchDecoder.java
@@ -36,7 +36,6 @@
*
* if (msg instanceof List) {
* processorManager.getDefaultExecutor().execute(new Runnable() {
- * @Override
* public void run() {
* // batch submit to an executor
* for (Object m : (List>) msg) {
diff --git a/src/main/java/com/alipay/remoting/config/BoltClientOption.java b/src/main/java/com/alipay/remoting/config/BoltClientOption.java
new file mode 100644
index 00000000..d62185a3
--- /dev/null
+++ b/src/main/java/com/alipay/remoting/config/BoltClientOption.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.remoting.config;
+
+/**
+ * Supported options in client side.
+ *
+ * @author chengyi (mark.lx@antfin.com) 2018-11-06 18:00
+ */
+public class BoltClientOption extends BoltGenericOption {
+
+ public static final BoltOption NETTY_IO_RATIO = valueOf(
+ "bolt.tcp.heartbeat.interval",
+ 15 * 1000);
+ public static final BoltOption TCP_IDLE_MAXTIMES = valueOf(
+ "bolt.tcp.heartbeat.maxtimes",
+ 3);
+
+ public static final BoltOption CONN_CREATE_TP_MIN_SIZE = valueOf(
+ "bolt.conn.create.tp.min",
+ 3);
+ public static final BoltOption CONN_CREATE_TP_MAX_SIZE = valueOf(
+ "bolt.conn.create.tp.max",
+ 8);
+ public static final BoltOption CONN_CREATE_TP_QUEUE_SIZE = valueOf(
+ "bolt.conn.create.tp.queue",
+ 50);
+ public static final BoltOption CONN_CREATE_TP_KEEPALIVE_TIME = valueOf(
+ "bolt.conn.create.tp.keepalive",
+ 60);
+
+ private BoltClientOption(String name, T defaultValue) {
+ super(name, defaultValue);
+ }
+}
diff --git a/src/main/java/com/alipay/remoting/config/BoltGenericOption.java b/src/main/java/com/alipay/remoting/config/BoltGenericOption.java
new file mode 100644
index 00000000..b548aeec
--- /dev/null
+++ b/src/main/java/com/alipay/remoting/config/BoltGenericOption.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.remoting.config;
+
+import com.alipay.remoting.ConnectionSelectStrategy;
+
+/**
+ * Supported options both in client and server side.
+ *
+ * @author chengyi (mark.lx@antfin.com) 2018-11-06 17:59
+ */
+public class BoltGenericOption extends BoltOption {
+
+ /*------------ NETTY Config Start ------------*/
+ public static final BoltOption TCP_NODELAY = valueOf(
+ "bolt.tcp.nodelay",
+ true);
+ public static final BoltOption TCP_SO_REUSEADDR = valueOf(
+ "bolt.tcp.so.reuseaddr",
+ true);
+ public static final BoltOption TCP_SO_KEEPALIVE = valueOf(
+ "bolt.tcp.so.keepalive",
+ true);
+
+ public static final BoltOption NETTY_IO_RATIO = valueOf(
+ "bolt.netty.io.ratio",
+ 70);
+ public static final BoltOption NETTY_BUFFER_POOLED = valueOf(
+ "bolt.netty.buffer.pooled",
+ true);
+
+ public static final BoltOption NETTY_BUFFER_HIGH_WATER_MARK = valueOf(
+ "bolt.netty.buffer.high.watermark",
+ 64 * 1024);
+ public static final BoltOption NETTY_BUFFER_LOW_WATER_MARK = valueOf(
+ "bolt.netty.buffer.low.watermark",
+ 32 * 1024);
+
+ public static final BoltOption NETTY_EPOLL_SWITCH = valueOf(
+ "bolt.netty.epoll.switch",
+ true);
+
+ public static final BoltOption TCP_IDLE_SWITCH = valueOf(
+ "bolt.tcp.heartbeat.switch",
+ true);
+ /*------------ NETTY Config End ------------*/
+
+ /*------------ Thread Pool Config Start ------------*/
+ public static final BoltOption TP_MIN_SIZE = valueOf(
+ "bolt.tp.min",
+ 20);
+ public static final BoltOption TP_MAX_SIZE = valueOf(
+ "bolt.tp.max",
+ 400);
+ public static final BoltOption TP_QUEUE_SIZE = valueOf(
+ "bolt.tp.queue",
+ 600);
+ public static final BoltOption TP_KEEPALIVE_TIME = valueOf(
+ "bolt.tp.keepalive",
+ 60);
+
+ /*------------ Thread Pool Config End ------------*/
+
+ public static final BoltOption CONNECTION_SELECT_STRATEGY = valueOf("CONNECTION_SELECT_STRATEGY");
+
+ protected BoltGenericOption(String name, T defaultValue) {
+ super(name, defaultValue);
+ }
+}
diff --git a/src/main/java/com/alipay/remoting/config/BoltOption.java b/src/main/java/com/alipay/remoting/config/BoltOption.java
new file mode 100644
index 00000000..7652a5b9
--- /dev/null
+++ b/src/main/java/com/alipay/remoting/config/BoltOption.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.remoting.config;
+
+/**
+ * The base implementation class of the configuration item.
+ *
+ * @author chengyi (mark.lx@antfin.com) 2018-11-06 17:25
+ */
+public class BoltOption {
+
+ private final String name;
+ private T defaultValue;
+
+ protected BoltOption(String name, T defaultValue) {
+ this.name = name;
+ this.defaultValue = defaultValue;
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public T defaultValue() {
+ return defaultValue;
+ }
+
+ public static BoltOption valueOf(String name) {
+ return new BoltOption(name, null);
+ }
+
+ public static BoltOption valueOf(String name, T defaultValue) {
+ return new BoltOption(name, defaultValue);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ BoltOption> that = (BoltOption>) o;
+
+ return name != null ? name.equals(that.name) : that.name == null;
+ }
+
+ @Override
+ public int hashCode() {
+ return name != null ? name.hashCode() : 0;
+ }
+}
diff --git a/src/main/java/com/alipay/remoting/config/BoltOptions.java b/src/main/java/com/alipay/remoting/config/BoltOptions.java
new file mode 100644
index 00000000..cb846583
--- /dev/null
+++ b/src/main/java/com/alipay/remoting/config/BoltOptions.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.remoting.config;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Option carrier.
+ *
+ * @author chengyi (mark.lx@antfin.com) 2018-11-06 17:42
+ */
+public class BoltOptions {
+
+ private ConcurrentHashMap, Object> options = new ConcurrentHashMap, Object>();
+
+ /**
+ * Get the optioned value.
+ * Return default value if option does not exist.
+ *
+ * @param option target option
+ * @return the optioned value of default value if option does not exist.
+ */
+ @SuppressWarnings("unchecked")
+ public T option(BoltOption option) {
+ Object value = options.get(option);
+ if (value == null) {
+ value = option.defaultValue();
+ }
+
+ return value == null ? null : (T) value;
+ }
+
+ /**
+ * Set up an new option with specific value.
+ * Use a value of {@code null} to remove a previous set {@link BoltOption}.
+ *
+ * @param option target option
+ * @param value option value, null for remove a previous set {@link BoltOption}.
+ * @return this BoltOptions instance
+ */
+ public BoltOptions option(BoltOption option, T value) {
+ if (value == null) {
+ options.remove(option);
+ return this;
+ }
+
+ options.put(option, value);
+ return this;
+ }
+}
diff --git a/src/main/java/com/alipay/remoting/config/BoltServerOption.java b/src/main/java/com/alipay/remoting/config/BoltServerOption.java
new file mode 100644
index 00000000..74c0ac07
--- /dev/null
+++ b/src/main/java/com/alipay/remoting/config/BoltServerOption.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.remoting.config;
+
+/**
+ * Supported options in server side.
+ *
+ * @author chengyi (mark.lx@antfin.com) 2018-11-06 18:00
+ */
+public class BoltServerOption extends BoltGenericOption {
+
+ public static final BoltOption TCP_SO_BACKLOG = valueOf("bolt.tcp.so.backlog", 1024);
+
+ public static final BoltOption NETTY_EPOLL_LT = valueOf("bolt.netty.epoll.lt", true);
+
+ public static final BoltOption TCP_SERVER_IDLE = valueOf(
+ "bolt.tcp.server.idle.interval",
+ 90 * 1000);
+
+ private BoltServerOption(String name, T defaultValue) {
+ super(name, defaultValue);
+ }
+}
diff --git a/src/main/java/com/alipay/remoting/config/Configs.java b/src/main/java/com/alipay/remoting/config/Configs.java
index c7c38b6c..f60a9938 100644
--- a/src/main/java/com/alipay/remoting/config/Configs.java
+++ b/src/main/java/com/alipay/remoting/config/Configs.java
@@ -170,6 +170,7 @@ public class Configs {
public static final String CONN_THRESHOLD_DEFAULT = "3";
/** Retry detect period for ScheduledDisconnectStrategy */
+ @Deprecated
public static final String RETRY_DETECT_PERIOD = "bolt.retry.delete.period";
public static final String RETRY_DETECT_PERIOD_DEFAULT = "5000";
diff --git a/src/main/java/com/alipay/remoting/config/Configurable.java b/src/main/java/com/alipay/remoting/config/Configurable.java
new file mode 100644
index 00000000..5bafcae9
--- /dev/null
+++ b/src/main/java/com/alipay/remoting/config/Configurable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.remoting.config;
+
+/**
+ * Config interface.
+ *
+ * @author chengyi (mark.lx@antfin.com) 2018-11-06 14:46
+ */
+public interface Configurable {
+
+ /**
+ * Get the option value.
+ *
+ * @param option target option
+ * @return BoltOption
+ */
+ T option(BoltOption option);
+
+ /**
+ * Allow to specify a {@link BoltOption} which is used for the {@link Configurable} instances once they got
+ * created. Use a value of {@code null} to remove a previous set {@link BoltOption}.
+ *
+ * @param option target option
+ * @param value option value, null to remove the previous option
+ * @return Configurable instance
+ */
+ Configurable option(BoltOption option, T value);
+
+}
diff --git a/src/main/java/com/alipay/remoting/config/configs/ConfigContainer.java b/src/main/java/com/alipay/remoting/config/configs/ConfigContainer.java
index 30118d2c..c62a2b82 100644
--- a/src/main/java/com/alipay/remoting/config/configs/ConfigContainer.java
+++ b/src/main/java/com/alipay/remoting/config/configs/ConfigContainer.java
@@ -30,7 +30,7 @@ public interface ConfigContainer {
* check whether a config item of a certain config type exist.
* @param configType config types in the config container, different config type can hold the same config item key
* @param configItem config items in the config container
- * @return exist then return true, not exist return alse
+ * @return exist then return true, not exist return false
*/
boolean contains(ConfigType configType, ConfigItem configItem);
diff --git a/src/main/java/com/alipay/remoting/config/switches/ProtocolSwitch.java b/src/main/java/com/alipay/remoting/config/switches/ProtocolSwitch.java
index aad3e38b..a43df9d7 100644
--- a/src/main/java/com/alipay/remoting/config/switches/ProtocolSwitch.java
+++ b/src/main/java/com/alipay/remoting/config/switches/ProtocolSwitch.java
@@ -26,7 +26,7 @@
*/
public class ProtocolSwitch implements Switch {
- // switche index
+ // switch index
public static final int CRC_SWITCH_INDEX = 0x000;
// default value
diff --git a/src/main/java/com/alipay/remoting/constant/Constants.java b/src/main/java/com/alipay/remoting/constant/Constants.java
new file mode 100644
index 00000000..79837162
--- /dev/null
+++ b/src/main/java/com/alipay/remoting/constant/Constants.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.remoting.constant;
+
+/**
+ * Bolt Constants.
+ *
+ * @author chengyi (mark.lx@antfin.com) 2019-03-06 15:19
+ */
+public class Constants {
+
+ /**
+ * default expire time to remove connection pool, time unit: milliseconds
+ */
+ public static final int DEFAULT_EXPIRE_TIME = 10 * 60 * 1000;
+
+ /**
+ * default retry times when failed to get result of FutureTask
+ */
+ public static final int DEFAULT_RETRY_TIMES = 2;
+
+}
diff --git a/src/main/java/com/alipay/remoting/rpc/RpcClient.java b/src/main/java/com/alipay/remoting/rpc/RpcClient.java
index e2c6e6eb..c6397999 100644
--- a/src/main/java/com/alipay/remoting/rpc/RpcClient.java
+++ b/src/main/java/com/alipay/remoting/rpc/RpcClient.java
@@ -20,6 +20,12 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import com.alipay.remoting.AbstractBoltClient;
+import com.alipay.remoting.ConnectionSelectStrategy;
+import com.alipay.remoting.DefaultClientConnectionManager;
+import com.alipay.remoting.LifeCycleException;
+import com.alipay.remoting.Reconnector;
+import com.alipay.remoting.config.BoltGenericOption;
import org.slf4j.Logger;
import com.alipay.remoting.Connection;
@@ -28,8 +34,6 @@
import com.alipay.remoting.ConnectionEventProcessor;
import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.ConnectionMonitorStrategy;
-import com.alipay.remoting.ConnectionPool;
-import com.alipay.remoting.ConnectionSelectStrategy;
import com.alipay.remoting.DefaultConnectionManager;
import com.alipay.remoting.DefaultConnectionMonitor;
import com.alipay.remoting.InvokeCallback;
@@ -39,10 +43,7 @@
import com.alipay.remoting.RemotingAddressParser;
import com.alipay.remoting.ScheduledDisconnectStrategy;
import com.alipay.remoting.Url;
-import com.alipay.remoting.config.AbstractConfigurableInstance;
-import com.alipay.remoting.config.configs.ConfigType;
import com.alipay.remoting.config.switches.GlobalSwitch;
-import com.alipay.remoting.connection.ConnectionFactory;
import com.alipay.remoting.exception.RemotingException;
import com.alipay.remoting.log.BoltLoggerFactory;
import com.alipay.remoting.rpc.protocol.UserProcessor;
@@ -54,300 +55,165 @@
* @author jiangping
* @version $Id: RpcClient.java, v 0.1 2015-9-23 PM4:03:28 tao Exp $
*/
-public class RpcClient extends AbstractConfigurableInstance {
-
- /** logger */
- private static final Logger logger = BoltLoggerFactory
- .getLogger("RpcRemoting");
-
- private ConcurrentHashMap> userProcessors = new ConcurrentHashMap>();
- /** connection factory */
- private ConnectionFactory connectionFactory = new RpcConnectionFactory(
- userProcessors,
- this);
+public class RpcClient extends AbstractBoltClient {
- /** connection event handler */
- private ConnectionEventHandler connectionEventHandler = new RpcConnectionEventHandler(
- switches());
+ private static final Logger logger = BoltLoggerFactory
+ .getLogger("RpcRemoting");
- /** reconnect manager */
- private ReconnectManager reconnectManager;
+ private final RpcTaskScanner taskScanner;
+ private final ConcurrentHashMap> userProcessors;
+ private final ConnectionEventHandler connectionEventHandler;
+ private final ConnectionEventListener connectionEventListener;
- /** connection event listener */
- private ConnectionEventListener connectionEventListener = new ConnectionEventListener();
+ private DefaultClientConnectionManager connectionManager;
+ private Reconnector reconnectManager;
+ private RemotingAddressParser addressParser;
+ private DefaultConnectionMonitor connectionMonitor;
+ private ConnectionMonitorStrategy monitorStrategy;
- /** address parser to get custom args */
- private RemotingAddressParser addressParser;
+ // used in RpcClientAdapter (bolt-tr-adapter)
+ protected RpcRemoting rpcRemoting;
- /** connection select strategy */
- private ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy(
- switches());
-
- /** connection manager */
- private DefaultConnectionManager connectionManager = new DefaultConnectionManager(
- connectionSelectStrategy,
- connectionFactory,
- connectionEventHandler,
- connectionEventListener,
- switches());
-
- /** rpc remoting */
- protected RpcRemoting rpcRemoting;
-
- /** task scanner */
- private RpcTaskScanner taskScanner = new RpcTaskScanner();
+ public RpcClient() {
+ this.taskScanner = new RpcTaskScanner();
+ this.userProcessors = new ConcurrentHashMap>();
+ this.connectionEventHandler = new RpcConnectionEventHandler(switches());
+ this.connectionEventListener = new ConnectionEventListener();
+ }
- /** connection monitor */
- private DefaultConnectionMonitor connectionMonitor;
+ /**
+ * Please use {@link RpcClient#startup()} instead
+ */
+ @Deprecated
+ public void init() {
+ startup();
+ }
- /** connection monitor strategy */
- private ConnectionMonitorStrategy monitorStrategy;
+ /**
+ * Shutdown.
+ *
+ * Notice:
+ *
Rpc client can not be used any more after shutdown.
+ * If you need, you should destroy it, and instantiate another one.
+ */
+ @Override
+ public void shutdown() {
+ super.shutdown();
- public RpcClient() {
- super(ConfigType.CLIENT_SIDE);
+ this.connectionManager.shutdown();
+ logger.warn("Close all connections from client side!");
+ this.taskScanner.shutdown();
+ logger.warn("Rpc client shutdown!");
+ if (reconnectManager != null) {
+ reconnectManager.shutdown();
+ }
+ if (connectionMonitor != null) {
+ connectionMonitor.shutdown();
+ }
}
- public void init() {
+ @Override
+ public void startup() throws LifeCycleException {
+ super.startup();
+
if (this.addressParser == null) {
this.addressParser = new RpcAddressParser();
}
+
+ ConnectionSelectStrategy connectionSelectStrategy = option(BoltGenericOption.CONNECTION_SELECT_STRATEGY);
+ if (connectionSelectStrategy == null) {
+ connectionSelectStrategy = new RandomSelectStrategy(switches());
+ }
+ this.connectionManager = new DefaultClientConnectionManager(connectionSelectStrategy,
+ new RpcConnectionFactory(userProcessors, this), connectionEventHandler,
+ connectionEventListener, switches());
this.connectionManager.setAddressParser(this.addressParser);
- this.connectionManager.init();
+ this.connectionManager.startup();
this.rpcRemoting = new RpcClientRemoting(new RpcCommandFactory(), this.addressParser,
this.connectionManager);
this.taskScanner.add(this.connectionManager);
- this.taskScanner.start();
+ this.taskScanner.startup();
if (switches().isOn(GlobalSwitch.CONN_MONITOR_SWITCH)) {
if (monitorStrategy == null) {
- ScheduledDisconnectStrategy strategy = new ScheduledDisconnectStrategy();
- connectionMonitor = new DefaultConnectionMonitor(strategy, this.connectionManager);
+ connectionMonitor = new DefaultConnectionMonitor(new ScheduledDisconnectStrategy(),
+ this.connectionManager);
} else {
connectionMonitor = new DefaultConnectionMonitor(monitorStrategy,
this.connectionManager);
}
- connectionMonitor.start();
+ connectionMonitor.startup();
logger.warn("Switch on connection monitor");
}
if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
reconnectManager = new ReconnectManager(connectionManager);
- connectionEventHandler.setReconnectManager(reconnectManager);
+ reconnectManager.startup();
+
+ connectionEventHandler.setReconnector(reconnectManager);
logger.warn("Switch on reconnect manager");
}
}
- /**
- * Shutdown.
- *
- * Notice:
- *
Rpc client can not be used any more after shutdown.
- * If you need, you should destroy it, and instantiate another one.
- */
- public void shutdown() {
- this.connectionManager.removeAll();
- logger.warn("Close all connections from client side!");
- this.taskScanner.shutdown();
- logger.warn("Rpc client shutdown!");
- if (reconnectManager != null) {
- reconnectManager.stop();
- }
- if (connectionMonitor != null) {
- connectionMonitor.destroy();
- }
+ @Override
+ public void oneway(final String address, final Object request) throws RemotingException,
+ InterruptedException {
+ this.rpcRemoting.oneway(address, request, null);
}
- /**
- * One way invocation using a string address, address format example - 127.0.0.1:12200?key1=value1&key2=value2
- *
- * Notice:
- *
- * - DO NOT modify the request object concurrently when this method is called.
- * - When do invocation, use the string address to find a available connection, if none then create one.
- *
- * - You can use {@link RpcConfigs#CONNECT_TIMEOUT_KEY} to specify connection timeout, time unit is milliseconds, e.g [127.0.0.1:12200?_CONNECTTIMEOUT=3000]
- *
- You can use {@link RpcConfigs#CONNECTION_NUM_KEY} to specify connection number for each ip and port, e.g [127.0.0.1:12200?_CONNECTIONNUM=30]
- *
- You can use {@link RpcConfigs#CONNECTION_WARMUP_KEY} to specify whether need warmup all connections for the first time you call this method, e.g [127.0.0.1:12200?_CONNECTIONWARMUP=false]
- *
- * - You should use {@link #closeConnection(String addr)} to close it if you want.
- *
- *
- * @param addr
- * @param request
- * @throws RemotingException
- * @throws InterruptedException
- */
- public void oneway(final String addr, final Object request) throws RemotingException,
- InterruptedException {
- this.rpcRemoting.oneway(addr, request, null);
+ @Override
+ public void oneway(final String address, final Object request, final InvokeContext invokeContext)
+ throws RemotingException,
+ InterruptedException {
+ this.rpcRemoting.oneway(address, request, invokeContext);
}
- /**
- * Oneway invocation with a {@link InvokeContext}, common api notice please see {@link #oneway(Connection, Object)}
- *
- * @param addr
- * @param request
- * @param invokeContext
- * @throws RemotingException
- * @throws InterruptedException
- */
- public void oneway(final String addr, final Object request, final InvokeContext invokeContext)
- throws RemotingException,
- InterruptedException {
- this.rpcRemoting.oneway(addr, request, invokeContext);
- }
-
- /**
- * One way invocation using a parsed {@link Url}
- *
- * Notice:
- *
- * - DO NOT modify the request object concurrently when this method is called.
- * - When do invocation, use the parsed {@link Url} to find a available connection, if none then create one.
- *
- * - You can use {@link Url#setConnectTimeout} to specify connection timeout, time unit is milliseconds.
- *
- You can use {@link Url#setConnNum} to specify connection number for each ip and port.
- *
- You can use {@link Url#setConnWarmup} to specify whether need warmup all connections for the first time you call this method.
- *
- * - You should use {@link #closeConnection(Url url)} to close it if you want.
- *
- *
- * @param url
- * @param request
- * @throws RemotingException
- * @throws InterruptedException
- */
+ @Override
public void oneway(final Url url, final Object request) throws RemotingException,
InterruptedException {
this.rpcRemoting.oneway(url, request, null);
}
- /**
- * Oneway invocation with a {@link InvokeContext}, common api notice please see {@link #oneway(Url, Object)}
- *
- * @param url
- * @param request
- * @param invokeContext
- * @throws RemotingException
- * @throws InterruptedException
- */
+ @Override
public void oneway(final Url url, final Object request, final InvokeContext invokeContext)
throws RemotingException,
InterruptedException {
this.rpcRemoting.oneway(url, request, invokeContext);
}
- /**
- * One way invocation using a {@link Connection}
- *
- * Notice:
- * DO NOT modify the request object concurrently when this method is called.
- *
- * @param conn
- * @param request
- * @throws RemotingException
- */
+ @Override
public void oneway(final Connection conn, final Object request) throws RemotingException {
this.rpcRemoting.oneway(conn, request, null);
}
- /**
- * Oneway invocation with a {@link InvokeContext}, common api notice please see {@link #oneway(Connection, Object)}
- *
- * @param conn
- * @param request
- * @param invokeContext
- * @throws RemotingException
- */
+ @Override
public void oneway(final Connection conn, final Object request,
final InvokeContext invokeContext) throws RemotingException {
this.rpcRemoting.oneway(conn, request, invokeContext);
}
- /**
- * Synchronous invocation using a string address, address format example - 127.0.0.1:12200?key1=value1&key2=value2
- *
- * Notice:
- *
- * - DO NOT modify the request object concurrently when this method is called.
- * - When do invocation, use the string address to find a available connection, if none then create one.
- *
- * - You can use {@link RpcConfigs#CONNECT_TIMEOUT_KEY} to specify connection timeout, time unit is milliseconds, e.g [127.0.0.1:12200?_CONNECTTIMEOUT=3000]
- *
- You can use {@link RpcConfigs#CONNECTION_NUM_KEY} to specify connection number for each ip and port, e.g [127.0.0.1:12200?_CONNECTIONNUM=30]
- *
- You can use {@link RpcConfigs#CONNECTION_WARMUP_KEY} to specify whether need warmup all connections for the first time you call this method, e.g [127.0.0.1:12200?_CONNECTIONWARMUP=false]
- *
- * - You should use {@link #closeConnection(String addr)} to close it if you want.
- *
- *
- * @param addr
- * @param request
- * @param timeoutMillis
- * @return Object
- * @throws RemotingException
- * @throws InterruptedException
- */
- public Object invokeSync(final String addr, final Object request, final int timeoutMillis)
- throws RemotingException,
- InterruptedException {
- return this.rpcRemoting.invokeSync(addr, request, null, timeoutMillis);
+ @Override
+ public Object invokeSync(final String address, final Object request, final int timeoutMillis)
+ throws RemotingException,
+ InterruptedException {
+ return this.rpcRemoting.invokeSync(address, request, null, timeoutMillis);
}
- /**
- * Synchronous invocation with a {@link InvokeContext}, common api notice please see {@link #invokeSync(String, Object, int)}
- *
- * @param addr
- * @param request
- * @param invokeContext
- * @param timeoutMillis
- * @return
- * @throws RemotingException
- * @throws InterruptedException
- */
- public Object invokeSync(final String addr, final Object request,
+ @Override
+ public Object invokeSync(final String address, final Object request,
final InvokeContext invokeContext, final int timeoutMillis)
throws RemotingException,
InterruptedException {
- return this.rpcRemoting.invokeSync(addr, request, invokeContext, timeoutMillis);
+ return this.rpcRemoting.invokeSync(address, request, invokeContext, timeoutMillis);
}
- /**
- * Synchronous invocation using a parsed {@link Url}
- *
- * Notice:
- *
- * - DO NOT modify the request object concurrently when this method is called.
- * - When do invocation, use the parsed {@link Url} to find a available connection, if none then create one.
- *
- * - You can use {@link Url#setConnectTimeout} to specify connection timeout, time unit is milliseconds.
- *
- You can use {@link Url#setConnNum} to specify connection number for each ip and port.
- *
- You can use {@link Url#setConnWarmup} to specify whether need warmup all connections for the first time you call this method.
- *
- * - You should use {@link #closeConnection(Url url)} to close it if you want.
- *
- *
- * @param url
- * @param request
- * @param timeoutMillis
- * @return Object
- * @throws RemotingException
- * @throws InterruptedException
- */
+ @Override
public Object invokeSync(final Url url, final Object request, final int timeoutMillis)
throws RemotingException,
InterruptedException {
return this.invokeSync(url, request, null, timeoutMillis);
}
- /**
- * Synchronous invocation with a {@link InvokeContext}, common api notice please see {@link #invokeSync(Url, Object, int)}
- *
- * @param url
- * @param request
- * @param invokeContext
- * @param timeoutMillis
- * @return
- * @throws RemotingException
- * @throws InterruptedException
- */
+ @Override
public Object invokeSync(final Url url, final Object request,
final InvokeContext invokeContext, final int timeoutMillis)
throws RemotingException,
@@ -355,36 +221,14 @@ public Object invokeSync(final Url url, final Object request,
return this.rpcRemoting.invokeSync(url, request, invokeContext, timeoutMillis);
}
- /**
- * Synchronous invocation using a {@link Connection}
- *
- * Notice:
- * DO NOT modify the request object concurrently when this method is called.
- *
- * @param conn
- * @param request
- * @param timeoutMillis
- * @return Object
- * @throws RemotingException
- * @throws InterruptedException
- */
+ @Override
public Object invokeSync(final Connection conn, final Object request, final int timeoutMillis)
throws RemotingException,
InterruptedException {
return this.rpcRemoting.invokeSync(conn, request, null, timeoutMillis);
}
- /**
- * Synchronous invocation with a {@link InvokeContext}, common api notice please see {@link #invokeSync(Connection, Object, int)}
- *
- * @param conn
- * @param request
- * @param invokeContext
- * @param timeoutMillis
- * @return Object
- * @throws RemotingException
- * @throws InterruptedException
- */
+ @Override
public Object invokeSync(final Connection conn, final Object request,
final InvokeContext invokeContext, final int timeoutMillis)
throws RemotingException,
@@ -392,93 +236,29 @@ public Object invokeSync(final Connection conn, final Object request,
return this.rpcRemoting.invokeSync(conn, request, invokeContext, timeoutMillis);
}
- /**
- * Future invocation using a string address, address format example - 127.0.0.1:12200?key1=value1&key2=value2
- * You can get result use the returned {@link RpcResponseFuture}.
- *
- * Notice:
- *
- * - DO NOT modify the request object concurrently when this method is called.
- * - When do invocation, use the string address to find a available connection, if none then create one.
- *
- * - You can use {@link RpcConfigs#CONNECT_TIMEOUT_KEY} to specify connection timeout, time unit is milliseconds, e.g [127.0.0.1:12200?_CONNECTTIMEOUT=3000]
- *
- You can use {@link RpcConfigs#CONNECTION_NUM_KEY} to specify connection number for each ip and port, e.g [127.0.0.1:12200?_CONNECTIONNUM=30]
- *
- You can use {@link RpcConfigs#CONNECTION_WARMUP_KEY} to specify whether need warmup all connections for the first time you call this method, e.g [127.0.0.1:12200?_CONNECTIONWARMUP=false]
- *
- * - You should use {@link #closeConnection(String addr)} to close it if you want.
- *
- *
- * @param addr
- * @param request
- * @param timeoutMillis
- * @return RpcResponseFuture
- * @throws RemotingException
- * @throws InterruptedException
- */
- public RpcResponseFuture invokeWithFuture(final String addr, final Object request,
+ @Override
+ public RpcResponseFuture invokeWithFuture(final String address, final Object request,
final int timeoutMillis) throws RemotingException,
InterruptedException {
- return this.rpcRemoting.invokeWithFuture(addr, request, null, timeoutMillis);
+ return this.rpcRemoting.invokeWithFuture(address, request, null, timeoutMillis);
}
- /**
- * Future invocation with a {@link InvokeContext}, common api notice please see {@link #invokeWithFuture(String, Object, int)}
- *
- * @param addr
- * @param request
- * @param invokeContext
- * @param timeoutMillis
- * @return RpcResponseFuture
- * @throws RemotingException
- * @throws InterruptedException
- */
- public RpcResponseFuture invokeWithFuture(final String addr, final Object request,
+ @Override
+ public RpcResponseFuture invokeWithFuture(final String address, final Object request,
final InvokeContext invokeContext,
final int timeoutMillis) throws RemotingException,
InterruptedException {
- return this.rpcRemoting.invokeWithFuture(addr, request, invokeContext, timeoutMillis);
+ return this.rpcRemoting.invokeWithFuture(address, request, invokeContext, timeoutMillis);
}
- /**
- * Future invocation using a parsed {@link Url}
- * You can get result use the returned {@link RpcResponseFuture}.
- *
- * Notice:
- *
- * - DO NOT modify the request object concurrently when this method is called.
- * - When do invocation, use the parsed {@link Url} to find a available connection, if none then create one.
- *
- * - You can use {@link Url#setConnectTimeout} to specify connection timeout, time unit is milliseconds.
- *
- You can use {@link Url#setConnNum} to specify connection number for each ip and port.
- *
- You can use {@link Url#setConnWarmup} to specify whether need warmup all connections for the first time you call this method.
- *
- * - You should use {@link #closeConnection(Url url)} to close it if you want.
- *
- *
- * @param url
- * @param request
- * @param timeoutMillis
- * @return
- * @throws RemotingException
- * @throws InterruptedException
- */
+ @Override
public RpcResponseFuture invokeWithFuture(final Url url, final Object request,
final int timeoutMillis) throws RemotingException,
InterruptedException {
return this.rpcRemoting.invokeWithFuture(url, request, null, timeoutMillis);
}
- /**
- * Future invocation with a {@link InvokeContext}, common api notice please see {@link #invokeWithFuture(Url, Object, int)}
- *
- * @param url
- * @param request
- * @param invokeContext
- * @param timeoutMillis
- * @return
- * @throws RemotingException
- * @throws InterruptedException
- */
+ @Override
public RpcResponseFuture invokeWithFuture(final Url url, final Object request,
final InvokeContext invokeContext,
final int timeoutMillis) throws RemotingException,
@@ -486,113 +266,38 @@ public RpcResponseFuture invokeWithFuture(final Url url, final Object request,
return this.rpcRemoting.invokeWithFuture(url, request, invokeContext, timeoutMillis);
}
- /**
- * Future invocation using a {@link Connection}
- * You can get result use the returned {@link RpcResponseFuture}.
- *
- * Notice:
- * DO NOT modify the request object concurrently when this method is called.
- *
- * @param conn
- * @param request
- * @param timeoutMillis
- * @return
- * @throws RemotingException
- */
+ @Override
public RpcResponseFuture invokeWithFuture(final Connection conn, final Object request,
int timeoutMillis) throws RemotingException {
return this.rpcRemoting.invokeWithFuture(conn, request, null, timeoutMillis);
}
- /**
- * Future invocation with a {@link InvokeContext}, common api notice please see {@link #invokeWithFuture(Connection, Object, int)}
- *
- * @param conn
- * @param request
- * @param invokeContext
- * @param timeoutMillis
- * @return
- * @throws RemotingException
- */
+ @Override
public RpcResponseFuture invokeWithFuture(final Connection conn, final Object request,
final InvokeContext invokeContext, int timeoutMillis)
throws RemotingException {
return this.rpcRemoting.invokeWithFuture(conn, request, invokeContext, timeoutMillis);
}
- /**
- * Callback invocation using a string address, address format example - 127.0.0.1:12200?key1=value1&key2=value2
- * You can specify an implementation of {@link InvokeCallback} to get the result.
- *
- * Notice:
- *
- * - DO NOT modify the request object concurrently when this method is called.
- * - When do invocation, use the string address to find a available connection, if none then create one.
- *
- * - You can use {@link RpcConfigs#CONNECT_TIMEOUT_KEY} to specify connection timeout, time unit is milliseconds, e.g [127.0.0.1:12200?_CONNECTTIMEOUT=3000]
- *
- You can use {@link RpcConfigs#CONNECTION_NUM_KEY} to specify connection number for each ip and port, e.g [127.0.0.1:12200?_CONNECTIONNUM=30]
- *
- You can use {@link RpcConfigs#CONNECTION_WARMUP_KEY} to specify whether need warmup all connections for the first time you call this method, e.g [127.0.0.1:12200?_CONNECTIONWARMUP=false]
- *
- * - You should use {@link #closeConnection(String addr)} to close it if you want.
- *
- *
- * @param addr
- * @param request
- * @param invokeCallback
- * @param timeoutMillis
- * @throws RemotingException
- * @throws InterruptedException
- */
- public void invokeWithCallback(final String addr, final Object request,
+ @Override
+ public void invokeWithCallback(final String address, final Object request,
final InvokeCallback invokeCallback, final int timeoutMillis)
throws RemotingException,
InterruptedException {
- this.rpcRemoting.invokeWithCallback(addr, request, null, invokeCallback, timeoutMillis);
+ this.rpcRemoting.invokeWithCallback(address, request, null, invokeCallback, timeoutMillis);
}
- /**
- * Callback invocation with a {@link InvokeContext}, common api notice please see {@link #invokeWithCallback(String, Object, InvokeCallback, int)}
- *
- * @param addr
- * @param request
- * @param invokeContext
- * @param invokeCallback
- * @param timeoutMillis
- * @throws RemotingException
- * @throws InterruptedException
- */
- public void invokeWithCallback(final String addr, final Object request,
+ @Override
+ public void invokeWithCallback(final String address, final Object request,
final InvokeContext invokeContext,
final InvokeCallback invokeCallback, final int timeoutMillis)
throws RemotingException,
InterruptedException {
- this.rpcRemoting.invokeWithCallback(addr, request, invokeContext, invokeCallback,
+ this.rpcRemoting.invokeWithCallback(address, request, invokeContext, invokeCallback,
timeoutMillis);
}
- /**
- * Callback invocation using a parsed {@link Url}
- * You can specify an implementation of {@link InvokeCallback} to get the result.
- *
- * Notice:
- *
- * - DO NOT modify the request object concurrently when this method is called.
- * - When do invocation, use the parsed {@link Url} to find a available connection, if none then create one.
- *
- * - You can use {@link Url#setConnectTimeout} to specify connection timeout, time unit is milliseconds.
- *
- You can use {@link Url#setConnNum} to specify connection number for each ip and port.
- *
- You can use {@link Url#setConnWarmup} to specify whether need warmup all connections for the first time you call this method.
- *
- * - You should use {@link #closeConnection(Url url)} to close it if you want.
- *
- *
- * @param url
- * @param request
- * @param invokeCallback
- * @param timeoutMillis
- * @throws RemotingException
- * @throws InterruptedException
- */
+ @Override
public void invokeWithCallback(final Url url, final Object request,
final InvokeCallback invokeCallback, final int timeoutMillis)
throws RemotingException,
@@ -600,17 +305,7 @@ public void invokeWithCallback(final Url url, final Object request,
this.rpcRemoting.invokeWithCallback(url, request, null, invokeCallback, timeoutMillis);
}
- /**
- * Callback invocation with a {@link InvokeContext}, common api notice please see {@link #invokeWithCallback(Url, Object, InvokeCallback, int)}
- *
- * @param url
- * @param request
- * @param invokeContext
- * @param invokeCallback
- * @param timeoutMillis
- * @throws RemotingException
- * @throws InterruptedException
- */
+ @Override
public void invokeWithCallback(final Url url, final Object request,
final InvokeContext invokeContext,
final InvokeCallback invokeCallback, final int timeoutMillis)
@@ -620,35 +315,14 @@ public void invokeWithCallback(final Url url, final Object request,
timeoutMillis);
}
- /**
- * Callback invocation using a {@link Connection}
- * You can specify an implementation of {@link InvokeCallback} to get the result.
- *
- * Notice:
- * DO NOT modify the request object concurrently when this method is called.
- *
- * @param conn
- * @param request
- * @param invokeCallback
- * @param timeoutMillis
- * @throws RemotingException
- */
+ @Override
public void invokeWithCallback(final Connection conn, final Object request,
final InvokeCallback invokeCallback, final int timeoutMillis)
throws RemotingException {
this.rpcRemoting.invokeWithCallback(conn, request, null, invokeCallback, timeoutMillis);
}
- /**
- * Callback invocation with a {@link InvokeContext}, common api notice please see {@link #invokeWithCallback(Connection, Object, InvokeCallback, int)}
- *
- * @param conn
- * @param request
- * @param invokeContext
- * @param invokeCallback
- * @param timeoutMillis
- * @throws RemotingException
- */
+ @Override
public void invokeWithCallback(final Connection conn, final Object request,
final InvokeContext invokeContext,
final InvokeCallback invokeCallback, final int timeoutMillis)
@@ -657,146 +331,63 @@ public void invokeWithCallback(final Connection conn, final Object request,
timeoutMillis);
}
- /**
- * Add processor to process connection event.
- *
- * @param type
- * @param processor
- */
+ @Override
public void addConnectionEventProcessor(ConnectionEventType type,
ConnectionEventProcessor processor) {
this.connectionEventListener.addConnectionEventProcessor(type, processor);
}
- /**
- * Use UserProcessorRegisterHelper{@link UserProcessorRegisterHelper} to help register user processor for client side.
- *
- * @param processor
- * @throws RemotingException
- */
-
+ @Override
public void registerUserProcessor(UserProcessor> processor) {
UserProcessorRegisterHelper.registerUserProcessor(processor, this.userProcessors);
}
- /**
- * Create a stand alone connection using ip and port.
- *
- * Notice:
- *
Each time you call this method, will create a new connection.
- * Bolt will not control this connection.
- * You should use {@link #closeStandaloneConnection} to close it.
- *
- * @param ip
- * @param port
- * @param connectTimeout
- * @return
- * @throws RemotingException
- */
+ @Override
public Connection createStandaloneConnection(String ip, int port, int connectTimeout)
throws RemotingException {
return this.connectionManager.create(ip, port, connectTimeout);
}
- /**
- * Create a stand alone connection using address, address format example - 127.0.0.1:12200
- *
- * Notice:
- *
- * - Each time you can this method, will create a new connection.
- *
- Bolt will not control this connection.
- *
- You should use {@link #closeStandaloneConnection} to close it.
- *
- *
- * @param addr
- * @param connectTimeout
- * @return
- * @throws RemotingException
- */
- public Connection createStandaloneConnection(String addr, int connectTimeout)
- throws RemotingException {
- return this.connectionManager.create(addr, connectTimeout);
+ @Override
+ public Connection createStandaloneConnection(String address, int connectTimeout)
+ throws RemotingException {
+ return this.connectionManager.create(address, connectTimeout);
}
- /**
- * Close a standalone connection
- *
- * @param conn
- */
+ @Override
public void closeStandaloneConnection(Connection conn) {
if (null != conn) {
conn.close();
}
}
- /**
- * Get a connection using address, address format example - 127.0.0.1:12200?key1=value1&key2=value2
- *
- * Notice:
- *
- * - Get a connection, if none then create.
- *
- * - You can use {@link RpcConfigs#CONNECT_TIMEOUT_KEY} to specify connection timeout, time unit is milliseconds, e.g [127.0.0.1:12200?_CONNECTTIMEOUT=3000]
- *
- You can use {@link RpcConfigs#CONNECTION_NUM_KEY} to specify connection number for each ip and port, e.g [127.0.0.1:12200?_CONNECTIONNUM=30]
- *
- You can use {@link RpcConfigs#CONNECTION_WARMUP_KEY} to specify whether need warmup all connections for the first time you call this method, e.g [127.0.0.1:12200?_CONNECTIONWARMUP=false]
- *
- * - Bolt will control this connection in {@link ConnectionPool}
- *
- You should use {@link #closeConnection(String addr)} to close it.
- *
- * @param addr
- * @param connectTimeout this is prior to url args {@link RpcConfigs#CONNECT_TIMEOUT_KEY}
- * @return
- * @throws RemotingException
- */
- public Connection getConnection(String addr, int connectTimeout) throws RemotingException,
- InterruptedException {
- Url url = this.addressParser.parse(addr);
+ @Override
+ public Connection getConnection(String address, int connectTimeout) throws RemotingException,
+ InterruptedException {
+ Url url = this.addressParser.parse(address);
return this.getConnection(url, connectTimeout);
}
- /**
- * Get a connection using a {@link Url}.
- *
- * Notice:
- *
- * - Get a connection, if none then create.
- *
- Bolt will control this connection in {@link com.alipay.remoting.ConnectionPool}
- *
- You should use {@link #closeConnection(Url url)} to close it.
- *
- *
- * @param url
- * @param connectTimeout this is prior to url args {@link RpcConfigs#CONNECT_TIMEOUT_KEY}
- * @return
- * @throws RemotingException
- */
+ @Override
public Connection getConnection(Url url, int connectTimeout) throws RemotingException,
InterruptedException {
url.setConnectTimeout(connectTimeout);
return this.connectionManager.getAndCreateIfAbsent(url);
}
- /**
- * get all connections managed by rpc client
- *
- * @return map key is ip+port, value is a list of connections of this key.
- */
+ @Override
public Map> getAllManagedConnections() {
return this.connectionManager.getAll();
}
- /**
- * check connection, the address format example - 127.0.0.1:12200?key1=value1&key2=value2
- *
- * @param addr
- * @throws RemotingException
- * @return true if and only if there is a connection, and the connection is active and writable;else return false
- */
- public boolean checkConnection(String addr) {
- Url url = this.addressParser.parse(addr);
+ @Override
+ public boolean checkConnection(String address) {
+ Url url = this.addressParser.parse(address);
Connection conn = this.connectionManager.get(url.getUniqueKey());
try {
this.connectionManager.check(conn);
} catch (Exception e) {
+ logger.warn("check failed. connection: {}", conn, e);
return false;
}
return true;
@@ -810,156 +401,91 @@ public boolean checkConnection(String addr) {
public void closeConnection(String addr) {
Url url = this.addressParser.parse(addr);
if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH) && reconnectManager != null) {
- reconnectManager.addCancelUrl(url);
+ reconnectManager.disableReconnect(url);
}
this.connectionManager.remove(url.getUniqueKey());
}
- /**
- * Close all connections of a {@link Url}
- *
- * @param url
- */
+ @Override
public void closeConnection(Url url) {
if (switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH) && reconnectManager != null) {
- reconnectManager.addCancelUrl(url);
+ reconnectManager.disableReconnect(url);
}
this.connectionManager.remove(url.getUniqueKey());
}
- /**
- * Enable heart beat for a certain connection.
- * If this address not connected, then do nothing.
- *
- * Notice: this method takes no effect on a stand alone connection.
- *
- * @param addr
- */
- public void enableConnHeartbeat(String addr) {
- Url url = this.addressParser.parse(addr);
+ @Override
+ public void enableConnHeartbeat(String address) {
+ Url url = this.addressParser.parse(address);
this.enableConnHeartbeat(url);
}
- /**
- * Enable heart beat for a certain connection.
- * If this {@link Url} not connected, then do nothing.
- *
- * Notice: this method takes no effect on a stand alone connection.
- *
- * @param url
- */
+ @Override
public void enableConnHeartbeat(Url url) {
if (null != url) {
this.connectionManager.enableHeartbeat(this.connectionManager.get(url.getUniqueKey()));
}
}
- /**
- * Disable heart beat for a certain connection.
- * If this addr not connected, then do nothing.
- *
- * Notice: this method takes no effect on a stand alone connection.
- *
- * @param addr
- */
- public void disableConnHeartbeat(String addr) {
- Url url = this.addressParser.parse(addr);
+ @Override
+ public void disableConnHeartbeat(String address) {
+ Url url = this.addressParser.parse(address);
this.disableConnHeartbeat(url);
}
- /**
- * Disable heart beat for a certain connection.
- * If this {@link Url} not connected, then do nothing.
- *
- * Notice: this method takes no effect on a stand alone connection.
- *
- * @param url
- */
+ @Override
public void disableConnHeartbeat(Url url) {
if (null != url) {
this.connectionManager.disableHeartbeat(this.connectionManager.get(url.getUniqueKey()));
}
}
- /**
- * enable connection reconnect switch on
- *
- * Notice: This api should be called before {@link RpcClient#init()}
- */
+ @Override
public void enableReconnectSwitch() {
this.switches().turnOn(GlobalSwitch.CONN_RECONNECT_SWITCH);
}
- /**
- * disable connection reconnect switch off
- *
- * Notice: This api should be called before {@link RpcClient#init()}
- */
+ @Override
public void disableReconnectSwith() {
this.switches().turnOff(GlobalSwitch.CONN_RECONNECT_SWITCH);
}
- /**
- * is reconnect switch on
- * @return
- */
+ @Override
public boolean isReconnectSwitchOn() {
return this.switches().isOn(GlobalSwitch.CONN_RECONNECT_SWITCH);
}
- /**
- * enable connection monitor switch on
- */
+ @Override
public void enableConnectionMonitorSwitch() {
this.switches().turnOn(GlobalSwitch.CONN_MONITOR_SWITCH);
}
- /**
- * disable connection monitor switch off
- *
- * Notice: This api should be called before {@link RpcClient#init()}
- */
+ @Override
public void disableConnectionMonitorSwitch() {
this.switches().turnOff(GlobalSwitch.CONN_MONITOR_SWITCH);
}
- /**
- * is connection monitor switch on
- * @return
- */
+ @Override
public boolean isConnectionMonitorSwitchOn() {
return this.switches().isOn(GlobalSwitch.CONN_MONITOR_SWITCH);
}
- // ~~~ getter and setter
-
- protected DefaultConnectionManager getConnectionManager() {
+ @Override
+ public DefaultConnectionManager getConnectionManager() {
return this.connectionManager;
}
- /**
- * Getter method for property addressParser.
- *
- * @return property value of addressParser
- */
+ @Override
public RemotingAddressParser getAddressParser() {
return this.addressParser;
}
- /**
- * Setter method for property addressParser.
- *
- * @param addressParser value to be assigned to property addressParser
- */
+ @Override
public void setAddressParser(RemotingAddressParser addressParser) {
this.addressParser = addressParser;
}
- /**
- * Setter method for property monitorStrategy.
- *
- * @param monitorStrategy value to be assigned to property monitorStrategy
- */
+ @Override
public void setMonitorStrategy(ConnectionMonitorStrategy monitorStrategy) {
this.monitorStrategy = monitorStrategy;
}
diff --git a/src/main/java/com/alipay/remoting/rpc/RpcServer.java b/src/main/java/com/alipay/remoting/rpc/RpcServer.java
index 3c751b80..3d60c455 100644
--- a/src/main/java/com/alipay/remoting/rpc/RpcServer.java
+++ b/src/main/java/com/alipay/remoting/rpc/RpcServer.java
@@ -21,6 +21,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import com.alipay.remoting.ConnectionSelectStrategy;
+import com.alipay.remoting.DefaultServerConnectionManager;
import org.slf4j.Logger;
import com.alipay.remoting.AbstractRemotingServer;
@@ -123,7 +125,7 @@ public class RpcServer extends AbstractRemotingServer {
private RemotingAddressParser addressParser;
/** connection manager */
- private DefaultConnectionManager connectionManager;
+ private DefaultServerConnectionManager connectionManager;
/** rpc remoting */
protected RpcRemoting rpcRemoting;
@@ -227,8 +229,12 @@ protected void doInit() {
this.addressParser = new RpcAddressParser();
}
if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)) {
+ // in server side, do not care the connection service state, so use null instead of global switch
+ ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy(null);
+ this.connectionManager = new DefaultServerConnectionManager(connectionSelectStrategy);
+ this.connectionManager.startup();
+
this.connectionEventHandler = new RpcConnectionEventHandler(switches());
- this.connectionManager = new DefaultConnectionManager(new RandomSelectStrategy());
this.connectionEventHandler.setConnectionManager(this.connectionManager);
this.connectionEventHandler.setConnectionEventListener(this.connectionEventListener);
} else {
@@ -322,7 +328,7 @@ protected boolean doStop() {
}
if (this.switches().isOn(GlobalSwitch.SERVER_MANAGE_CONNECTION_SWITCH)
&& null != this.connectionManager) {
- this.connectionManager.removeAll();
+ this.connectionManager.shutdown();
logger.warn("Close all connections from server side!");
}
logger.warn("Rpc Server stopped!");
diff --git a/src/main/java/com/alipay/remoting/rpc/RpcTaskScanner.java b/src/main/java/com/alipay/remoting/rpc/RpcTaskScanner.java
index 5b9c5b3f..66c831fe 100644
--- a/src/main/java/com/alipay/remoting/rpc/RpcTaskScanner.java
+++ b/src/main/java/com/alipay/remoting/rpc/RpcTaskScanner.java
@@ -22,6 +22,8 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import com.alipay.remoting.AbstractLifeCycle;
+import com.alipay.remoting.LifeCycleException;
import org.slf4j.Logger;
import com.alipay.remoting.NamedThreadFactory;
@@ -34,19 +36,24 @@
* @author jiangping
* @version $Id: RpcTaskScanner.java, v 0.1 Mar 4, 2016 3:30:52 PM tao Exp $
*/
-public class RpcTaskScanner {
- private static final Logger logger = BoltLoggerFactory.getLogger("RpcRemoting");
+public class RpcTaskScanner extends AbstractLifeCycle {
- private ScheduledExecutorService scheduledService = new ScheduledThreadPoolExecutor(1,
- new NamedThreadFactory(
- "RpcTaskScannerThread", true));
+ private static final Logger logger = BoltLoggerFactory.getLogger("RpcRemoting");
- private List scanList = new LinkedList();
+ private final List scanList;
- /**
- * Start!
- */
- public void start() {
+ private ScheduledExecutorService scheduledService;
+
+ public RpcTaskScanner() {
+ this.scanList = new LinkedList();
+ }
+
+ @Override
+ public void startup() throws LifeCycleException {
+ super.startup();
+
+ scheduledService = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory(
+ "RpcTaskScannerThread", true));
scheduledService.scheduleWithFixedDelay(new Runnable() {
@Override
@@ -63,20 +70,26 @@ public void run() {
}, 10000, 10000, TimeUnit.MILLISECONDS);
}
+ @Override
+ public void shutdown() throws LifeCycleException {
+ super.shutdown();
+
+ scheduledService.shutdown();
+ }
+
/**
- * Add scan target.
- *
- * @param target
+ * Use {@link RpcTaskScanner#startup()} instead
*/
- public void add(Scannable target) {
- scanList.add(target);
+ @Deprecated
+ public void start() {
+ startup();
}
- /**
- * Shutdown the scheduled service.
+ /**
+ * Add scan target.
*/
- public void shutdown() {
- scheduledService.shutdown();
+ public void add(Scannable target) {
+ scanList.add(target);
}
}
diff --git a/src/main/java/com/alipay/remoting/rpc/protocol/RpcHeartbeatTrigger.java b/src/main/java/com/alipay/remoting/rpc/protocol/RpcHeartbeatTrigger.java
index 9c15c07a..1b89a676 100644
--- a/src/main/java/com/alipay/remoting/rpc/protocol/RpcHeartbeatTrigger.java
+++ b/src/main/java/com/alipay/remoting/rpc/protocol/RpcHeartbeatTrigger.java
@@ -105,13 +105,14 @@ public void onResponse(InvokeFuture future) {
}
ctx.channel().attr(Connection.HEARTBEAT_COUNT).set(0);
} else {
- if (response == null) {
+ if (response != null
+ && response.getResponseStatus() == ResponseStatus.TIMEOUT) {
logger.error("Heartbeat timeout! The address is {}",
RemotingUtil.parseRemoteAddress(ctx.channel()));
} else {
logger.error(
"Heartbeat exception caught! Error code={}, The address is {}",
- response.getResponseStatus(),
+ response == null ? null : response.getResponseStatus(),
RemotingUtil.parseRemoteAddress(ctx.channel()));
}
Integer times = ctx.channel().attr(Connection.HEARTBEAT_COUNT).get();
diff --git a/src/test/java/com/alipay/remoting/inner/connection/ConcurrentCreateConnectionTest.java b/src/test/java/com/alipay/remoting/inner/connection/ConcurrentCreateConnectionTest.java
index a948967a..3eb43230 100644
--- a/src/test/java/com/alipay/remoting/inner/connection/ConcurrentCreateConnectionTest.java
+++ b/src/test/java/com/alipay/remoting/inner/connection/ConcurrentCreateConnectionTest.java
@@ -18,6 +18,7 @@
import java.util.concurrent.ConcurrentHashMap;
+import com.alipay.remoting.DefaultClientConnectionManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -30,7 +31,6 @@
import com.alipay.remoting.ConnectionEventListener;
import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.ConnectionSelectStrategy;
-import com.alipay.remoting.DefaultConnectionManager;
import com.alipay.remoting.RandomSelectStrategy;
import com.alipay.remoting.RemotingAddressParser;
import com.alipay.remoting.Url;
@@ -56,8 +56,9 @@ public class ConcurrentCreateConnectionTest {
.getLogger(RpcConnectionManagerTest.class);
private ConcurrentHashMap> userProcessors = new ConcurrentHashMap>();
- private DefaultConnectionManager cm;
- private ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy();
+ private DefaultClientConnectionManager cm;
+ private ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy(
+ null);
private RemotingAddressParser addressParser = new RpcAddressParser();
private ConnectionFactory connectionFactory = new RpcConnectionFactory(
userProcessors,
@@ -74,10 +75,10 @@ public class ConcurrentCreateConnectionTest {
@Before
public void init() {
- cm = new DefaultConnectionManager(connectionSelectStrategy, connectionFactory,
+ cm = new DefaultClientConnectionManager(connectionSelectStrategy, connectionFactory,
connectionEventHandler, connectionEventListener);
cm.setAddressParser(addressParser);
- cm.init();
+ cm.startup();
server = new BoltServer(port);
server.start();
server.addConnectionEventProcessor(ConnectionEventType.CONNECT, serverConnectProcessor);
diff --git a/src/test/java/com/alipay/remoting/inner/connection/RpcConnectionManagerTest.java b/src/test/java/com/alipay/remoting/inner/connection/RpcConnectionManagerTest.java
index fdaf15e4..b4012338 100644
--- a/src/test/java/com/alipay/remoting/inner/connection/RpcConnectionManagerTest.java
+++ b/src/test/java/com/alipay/remoting/inner/connection/RpcConnectionManagerTest.java
@@ -20,6 +20,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import com.alipay.remoting.DefaultClientConnectionManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -58,8 +59,9 @@ public class RpcConnectionManagerTest {
private ConcurrentHashMap> userProcessors = new ConcurrentHashMap>();
- private DefaultConnectionManager cm;
- private ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy();
+ private DefaultClientConnectionManager cm;
+ private ConnectionSelectStrategy connectionSelectStrategy = new RandomSelectStrategy(
+ null);
private RemotingAddressParser addressParser = new RpcAddressParser();
private ConnectionFactory connectionFactory = new RpcConnectionFactory(
userProcessors,
@@ -79,10 +81,10 @@ public class RpcConnectionManagerTest {
@Before
public void init() {
- cm = new DefaultConnectionManager(connectionSelectStrategy, connectionFactory,
+ cm = new DefaultClientConnectionManager(connectionSelectStrategy, connectionFactory,
connectionEventHandler, connectionEventListener);
cm.setAddressParser(addressParser);
- cm.init();
+ cm.startup();
server = new BoltServer(port);
server.start();
server.addConnectionEventProcessor(ConnectionEventType.CONNECT, serverConnectProcessor);
diff --git a/src/test/java/com/alipay/remoting/rpc/RpcServerTest.java b/src/test/java/com/alipay/remoting/rpc/RpcServerTest.java
index d2050b16..50489714 100644
--- a/src/test/java/com/alipay/remoting/rpc/RpcServerTest.java
+++ b/src/test/java/com/alipay/remoting/rpc/RpcServerTest.java
@@ -78,7 +78,7 @@ private void doTestStartAndStop(boolean syncStop) {
try {
rpcServer2.stop();
Assert.fail("Should not reach here");
- } catch (IllegalStateException e) {
+ } catch (Exception e) {
// expect
}
}
diff --git a/src/test/java/com/alipay/remoting/rpc/connectionmanage/ScheduledDisconnectStrategyTest.java b/src/test/java/com/alipay/remoting/rpc/connectionmanage/ScheduledDisconnectStrategyTest.java
index 79e478fe..3f7f9635 100644
--- a/src/test/java/com/alipay/remoting/rpc/connectionmanage/ScheduledDisconnectStrategyTest.java
+++ b/src/test/java/com/alipay/remoting/rpc/connectionmanage/ScheduledDisconnectStrategyTest.java
@@ -180,7 +180,7 @@ public void testCloseFreshSelectConnections_byUserSetting() throws RemotingExcep
connection.removeInvokeFuture(1);
/* Monitor task sleep 500ms*/
Thread.sleep(100);
- Assert.assertEquals(0, clientDisConnectProcessor.getDisConnectTimes());
+ Assert.assertEquals(1, clientDisConnectProcessor.getDisConnectTimes());
Thread.sleep(500);
Assert.assertTrue(0 <= clientDisConnectProcessor.getDisConnectTimes());
}