diff --git a/.travis.yml b/.travis.yml
index 88d3d69c..bc44c946 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -5,6 +5,8 @@ jdk:
- oraclejdk8
- openjdk7
+dist: trusty
+
before_install:
- echo "Downloading Maven 3.2.5"
&& wget https://archive.apache.org/dist/maven/maven-3/3.2.5/binaries/apache-maven-3.2.5-bin.zip
diff --git a/pom.xml b/pom.xml
index 13b1ba2e..9bdfaadb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
4.0.0
com.alipay.sofa
bolt
- 1.4.7
+ 1.4.8
jar
${project.groupId}:${project.artifactId}
diff --git a/src/main/java/com/alipay/remoting/ConnectionEventHandler.java b/src/main/java/com/alipay/remoting/ConnectionEventHandler.java
index 2f1f6db0..8f2f809c 100644
--- a/src/main/java/com/alipay/remoting/ConnectionEventHandler.java
+++ b/src/main/java/com/alipay/remoting/ConnectionEventHandler.java
@@ -139,35 +139,55 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
Attribute attr = ctx.channel().attr(Connection.CONNECTION);
if (null != attr) {
- // add reconnect task
- if (this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
- Connection conn = (Connection) attr.get();
- if (reconnectManager != null) {
- reconnectManager.addReconnectTask(conn.getUrl());
- }
+ Connection conn = (Connection) attr.get();
+ // if conn is null, means that channel has been inactive before binding with connection
+ // this situation will fire a CLOSE event in ConnectionFactory
+ if (conn != null) {
+ userEventTriggered(ctx, ConnectionEventType.CLOSE);
}
- // trigger close connection event
- onEvent((Connection) attr.get(), remoteAddress, ConnectionEventType.CLOSE);
}
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
if (event instanceof ConnectionEventType) {
- switch ((ConnectionEventType) event) {
+ ConnectionEventType eventType = (ConnectionEventType) event;
+ Channel channel = ctx.channel();
+ if (channel == null) {
+ logger
+ .warn(
+ "channel null when handle user triggered event in ConnectionEventHandler! eventType: {}",
+ eventType.name());
+ return;
+ }
+ Connection connection = channel.attr(Connection.CONNECTION).get();
+ if (connection == null) {
+ logger
+ .error(
+ "[BUG]connection is null when handle user triggered event in ConnectionEventHandler! eventType: {}",
+ eventType.name());
+ return;
+ }
+
+ final String remoteAddress = RemotingUtil.parseRemoteAddress(ctx.channel());
+ final String localAddress = RemotingUtil.parseLocalAddress(ctx.channel());
+ logger.info("trigger user event, local[{}], remote[{}], event: {}", localAddress,
+ remoteAddress, eventType.name());
+
+ switch (eventType) {
case CONNECT:
- Channel channel = ctx.channel();
- if (null != channel) {
- Connection connection = channel.attr(Connection.CONNECTION).get();
- this.onEvent(connection, connection.getUrl().getOriginUrl(),
- ConnectionEventType.CONNECT);
- } else {
- logger
- .warn("channel null when handle user triggered event in ConnectionEventHandler!");
- }
+ onEvent(connection, connection.getUrl().getOriginUrl(),
+ ConnectionEventType.CONNECT);
+ break;
+ case CONNECT_FAILED:
+ case CLOSE:
+ case EXCEPTION:
+ submitReconnectTaskIfNecessary(connection.getUrl());
+ onEvent(connection, connection.getUrl().getOriginUrl(), eventType);
break;
default:
- return;
+ logger.error("[BUG]unknown event: {}", eventType.name());
+ break;
}
} else {
super.userEventTriggered(ctx, event);
@@ -185,12 +205,12 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
ctx.channel().close();
}
- /**
- *
- * @param conn
- * @param remoteAddress
- * @param type
- */
+ private void submitReconnectTaskIfNecessary(Url url) {
+ if (globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH) && reconnectManager != null) {
+ reconnectManager.addReconnectTask(url);
+ }
+ }
+
private void onEvent(final Connection conn, final String remoteAddress,
final ConnectionEventType type) {
if (this.eventListener != null) {
@@ -255,7 +275,7 @@ public void setReconnectManager(ReconnectManager reconnectManager) {
/**
* Dispatch connection event.
- *
+ *
* @author jiangping
* @version $Id: ConnectionEventExecutor.java, v 0.1 Mar 4, 2016 9:20:15 PM tao Exp $
*/
@@ -265,11 +285,6 @@ public class ConnectionEventExecutor {
new LinkedBlockingQueue(10000),
new NamedThreadFactory("Bolt-conn-event-executor", true));
- /**
- * Process event.
- *
- * @param event
- */
public void onEvent(Runnable event) {
try {
executor.execute(event);
@@ -279,11 +294,6 @@ public void onEvent(Runnable event) {
}
}
- /**
- * print info log
- * @param format
- * @param addr
- */
private void infoLog(String format, String addr) {
if (logger.isInfoEnabled()) {
if (StringUtils.isNotEmpty(addr)) {
diff --git a/src/main/java/com/alipay/remoting/ConnectionEventType.java b/src/main/java/com/alipay/remoting/ConnectionEventType.java
index 5298add9..07b6afee 100644
--- a/src/main/java/com/alipay/remoting/ConnectionEventType.java
+++ b/src/main/java/com/alipay/remoting/ConnectionEventType.java
@@ -23,5 +23,5 @@
* @version $Id: ConnectionEventType.java, v 0.1 Mar 4, 2016 8:03:27 PM tao Exp $
*/
public enum ConnectionEventType {
- CONNECT, CLOSE, EXCEPTION;
+ CONNECT, CONNECT_FAILED, CLOSE, EXCEPTION;
}
diff --git a/src/main/java/com/alipay/remoting/DefaultConnectionManager.java b/src/main/java/com/alipay/remoting/DefaultConnectionManager.java
index a56f912f..e6468df7 100644
--- a/src/main/java/com/alipay/remoting/DefaultConnectionManager.java
+++ b/src/main/java/com/alipay/remoting/DefaultConnectionManager.java
@@ -436,7 +436,14 @@ public void scan() {
Iterator iter = this.connTasks.keySet().iterator();
while (iter.hasNext()) {
String poolKey = iter.next();
- ConnectionPool pool = this.getConnectionPool(this.connTasks.get(poolKey));
+ RunStateRecordedFutureTask task = this.connTasks.get(poolKey);
+ if (!task.isDone()) {
+ logger.info("task(poolKey={}) is not done, do not scan the connection pool",
+ poolKey);
+ continue;
+ }
+
+ ConnectionPool pool = this.getConnectionPool(task);
if (null != pool) {
pool.scan();
if (pool.isEmpty()) {
diff --git a/src/main/java/com/alipay/remoting/rpc/RpcConnectionFactory.java b/src/main/java/com/alipay/remoting/rpc/RpcConnectionFactory.java
index a9d4d9fb..bb9e3827 100644
--- a/src/main/java/com/alipay/remoting/rpc/RpcConnectionFactory.java
+++ b/src/main/java/com/alipay/remoting/rpc/RpcConnectionFactory.java
@@ -21,6 +21,7 @@
import java.util.concurrent.TimeUnit;
import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.Channel;
import org.slf4j.Logger;
import com.alipay.remoting.Connection;
@@ -132,9 +133,14 @@ protected void initChannel(SocketChannel channel) throws Exception {
public Connection createConnection(Url url) throws Exception {
ChannelFuture future = doCreateConnection(url.getIp(), url.getPort(),
url.getConnectTimeout());
- Connection conn = new Connection(future.channel(),
- ProtocolCode.fromBytes(url.getProtocol()), url.getVersion(), url);
- future.channel().pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
+ Channel channel = future.channel();
+ Connection conn = new Connection(channel, ProtocolCode.fromBytes(url.getProtocol()),
+ url.getVersion(), url);
+ if (channel.isActive()) {
+ channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
+ } else {
+ channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT_FAILED);
+ }
return conn;
}
@@ -145,10 +151,15 @@ public Connection createConnection(Url url) throws Exception {
public Connection createConnection(String targetIP, int targetPort, int connectTimeout)
throws Exception {
ChannelFuture future = doCreateConnection(targetIP, targetPort, connectTimeout);
- Connection conn = new Connection(future.channel(),
+ Channel channel = future.channel();
+ Connection conn = new Connection(channel,
ProtocolCode.fromBytes(RpcProtocol.PROTOCOL_CODE), RpcProtocolV2.PROTOCOL_VERSION_1,
new Url(targetIP, targetPort));
- future.channel().pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
+ if (channel.isActive()) {
+ channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
+ } else {
+ channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT_FAILED);
+ }
return conn;
}
@@ -159,10 +170,15 @@ public Connection createConnection(String targetIP, int targetPort, int connectT
public Connection createConnection(String targetIP, int targetPort, byte version,
int connectTimeout) throws Exception {
ChannelFuture future = doCreateConnection(targetIP, targetPort, connectTimeout);
- Connection conn = new Connection(future.channel(),
+ Channel channel = future.channel();
+ Connection conn = new Connection(channel,
ProtocolCode.fromBytes(RpcProtocolV2.PROTOCOL_CODE), version, new Url(targetIP,
targetPort));
- future.channel().pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
+ if (channel.isActive()) {
+ channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
+ } else {
+ channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT_FAILED);
+ }
return conn;
}
diff --git a/src/main/java/com/alipay/remoting/util/FutureTaskUtil.java b/src/main/java/com/alipay/remoting/util/FutureTaskUtil.java
index b076ffc3..c25f76f0 100644
--- a/src/main/java/com/alipay/remoting/util/FutureTaskUtil.java
+++ b/src/main/java/com/alipay/remoting/util/FutureTaskUtil.java
@@ -46,9 +46,9 @@ public static T getFutureTaskResult(RunStateRecordedFutureTask task, Logg
} catch (ExecutionException e) {
logger.error("Future task execute failed!", e);
} catch (FutureTaskNotRunYetException e) {
- logger.error("Future task has not run yet!", e);
+ logger.warn("Future task has not run yet!", e);
} catch (FutureTaskNotCompleted e) {
- logger.error("Future task has not completed!", e);
+ logger.warn("Future task has not completed!", e);
}
}
return t;
diff --git a/src/test/java/com/alipay/remoting/rpc/connectionmanage/ConnectionExceptionTest.java b/src/test/java/com/alipay/remoting/rpc/connectionmanage/ConnectionExceptionTest.java
new file mode 100644
index 00000000..5033ed74
--- /dev/null
+++ b/src/test/java/com/alipay/remoting/rpc/connectionmanage/ConnectionExceptionTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.remoting.rpc.connectionmanage;
+
+import com.alipay.remoting.Connection;
+import com.alipay.remoting.ConnectionEventProcessor;
+import com.alipay.remoting.ConnectionEventType;
+import com.alipay.remoting.exception.RemotingException;
+import com.alipay.remoting.rpc.RpcClient;
+import com.alipay.remoting.rpc.common.BoltServer;
+import com.alipay.remoting.rpc.common.CONNECTEventProcessor;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * @author chengyi (mark.lx@antfin.com) 2019-06-25 19:59
+ */
+public class ConnectionExceptionTest {
+
+ @Test
+ public void testConnectionException() throws RemotingException, InterruptedException {
+ CONNECTEventProcessor serverConnectProcessor = new CONNECTEventProcessor();
+
+ BoltServer boltServer = new BoltServer(1024);
+ boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT, serverConnectProcessor);
+ boltServer.start();
+
+ final String[] closedUrl = new String[1];
+ RpcClient client = new RpcClient();
+ client.enableReconnectSwitch();
+ client.addConnectionEventProcessor(ConnectionEventType.CLOSE,
+ new ConnectionEventProcessor() {
+ @Override
+ public void onEvent(String remoteAddr, Connection conn) {
+ closedUrl[0] = remoteAddr;
+ }
+ });
+ client.init();
+
+ Connection connection = client.getConnection("127.0.0.1:1024", 1000);
+ Thread.sleep(10);
+ Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());
+
+ connection.getChannel().close();
+
+ Thread.sleep(100);
+ Assert.assertTrue("127.0.0.1:1024".equals(closedUrl[0]));
+
+ // connection has been created by ReconnectManager
+ Thread.sleep(1000 * 2);
+ Assert.assertEquals(2, serverConnectProcessor.getConnectTimes());
+ connection = client.getConnection("127.0.0.1:1024", 1000);
+ Assert.assertTrue(connection.isFine());
+ Assert.assertEquals(2, serverConnectProcessor.getConnectTimes());
+
+ boltServer.stop();
+ }
+}