Skip to content

Commit

Permalink
handling reconnection situation with no connection binding with chann…
Browse files Browse the repository at this point in the history
…el (#166)

* 1. upgrade version to 1.4.8
2. fix NPE in channelInactive
3. handling reconnection situation with no connection binding with channel

* du not scan connection pool associated with task that has not been done

* fix travis.yml

* 1. print user event log
2. add CONNECT_FAILED event type
  • Loading branch information
dbl-x authored and yuemingliang committed Jun 26, 2019
1 parent 8495d99 commit d2fbaf0
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 48 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ jdk:
- oraclejdk8
- openjdk7

dist: trusty

before_install:
- echo "Downloading Maven 3.2.5"
&& wget https://archive.apache.org/dist/maven/maven-3/3.2.5/binaries/apache-maven-3.2.5-bin.zip
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
<version>1.4.7</version>
<version>1.4.8</version>
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down
82 changes: 46 additions & 36 deletions src/main/java/com/alipay/remoting/ConnectionEventHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,35 +139,55 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
Attribute attr = ctx.channel().attr(Connection.CONNECTION);
if (null != attr) {
// add reconnect task
if (this.globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH)) {
Connection conn = (Connection) attr.get();
if (reconnectManager != null) {
reconnectManager.addReconnectTask(conn.getUrl());
}
Connection conn = (Connection) attr.get();
// if conn is null, means that channel has been inactive before binding with connection
// this situation will fire a CLOSE event in ConnectionFactory
if (conn != null) {
userEventTriggered(ctx, ConnectionEventType.CLOSE);
}
// trigger close connection event
onEvent((Connection) attr.get(), remoteAddress, ConnectionEventType.CLOSE);
}
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
if (event instanceof ConnectionEventType) {
switch ((ConnectionEventType) event) {
ConnectionEventType eventType = (ConnectionEventType) event;
Channel channel = ctx.channel();
if (channel == null) {
logger
.warn(
"channel null when handle user triggered event in ConnectionEventHandler! eventType: {}",
eventType.name());
return;
}
Connection connection = channel.attr(Connection.CONNECTION).get();
if (connection == null) {
logger
.error(
"[BUG]connection is null when handle user triggered event in ConnectionEventHandler! eventType: {}",
eventType.name());
return;
}

final String remoteAddress = RemotingUtil.parseRemoteAddress(ctx.channel());
final String localAddress = RemotingUtil.parseLocalAddress(ctx.channel());
logger.info("trigger user event, local[{}], remote[{}], event: {}", localAddress,
remoteAddress, eventType.name());

switch (eventType) {
case CONNECT:
Channel channel = ctx.channel();
if (null != channel) {
Connection connection = channel.attr(Connection.CONNECTION).get();
this.onEvent(connection, connection.getUrl().getOriginUrl(),
ConnectionEventType.CONNECT);
} else {
logger
.warn("channel null when handle user triggered event in ConnectionEventHandler!");
}
onEvent(connection, connection.getUrl().getOriginUrl(),
ConnectionEventType.CONNECT);
break;
case CONNECT_FAILED:
case CLOSE:
case EXCEPTION:
submitReconnectTaskIfNecessary(connection.getUrl());
onEvent(connection, connection.getUrl().getOriginUrl(), eventType);
break;
default:
return;
logger.error("[BUG]unknown event: {}", eventType.name());
break;
}
} else {
super.userEventTriggered(ctx, event);
Expand All @@ -185,12 +205,12 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
ctx.channel().close();
}

/**
*
* @param conn
* @param remoteAddress
* @param type
*/
private void submitReconnectTaskIfNecessary(Url url) {
if (globalSwitch.isOn(GlobalSwitch.CONN_RECONNECT_SWITCH) && reconnectManager != null) {
reconnectManager.addReconnectTask(url);
}
}

private void onEvent(final Connection conn, final String remoteAddress,
final ConnectionEventType type) {
if (this.eventListener != null) {
Expand Down Expand Up @@ -255,7 +275,7 @@ public void setReconnectManager(ReconnectManager reconnectManager) {

/**
* Dispatch connection event.
*
*
* @author jiangping
* @version $Id: ConnectionEventExecutor.java, v 0.1 Mar 4, 2016 9:20:15 PM tao Exp $
*/
Expand All @@ -265,11 +285,6 @@ public class ConnectionEventExecutor {
new LinkedBlockingQueue<Runnable>(10000),
new NamedThreadFactory("Bolt-conn-event-executor", true));

/**
* Process event.
*
* @param event
*/
public void onEvent(Runnable event) {
try {
executor.execute(event);
Expand All @@ -279,11 +294,6 @@ public void onEvent(Runnable event) {
}
}

/**
* print info log
* @param format
* @param addr
*/
private void infoLog(String format, String addr) {
if (logger.isInfoEnabled()) {
if (StringUtils.isNotEmpty(addr)) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/alipay/remoting/ConnectionEventType.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@
* @version $Id: ConnectionEventType.java, v 0.1 Mar 4, 2016 8:03:27 PM tao Exp $
*/
public enum ConnectionEventType {
CONNECT, CLOSE, EXCEPTION;
CONNECT, CONNECT_FAILED, CLOSE, EXCEPTION;
}
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,14 @@ public void scan() {
Iterator<String> iter = this.connTasks.keySet().iterator();
while (iter.hasNext()) {
String poolKey = iter.next();
ConnectionPool pool = this.getConnectionPool(this.connTasks.get(poolKey));
RunStateRecordedFutureTask<ConnectionPool> task = this.connTasks.get(poolKey);
if (!task.isDone()) {
logger.info("task(poolKey={}) is not done, do not scan the connection pool",
poolKey);
continue;
}

ConnectionPool pool = this.getConnectionPool(task);
if (null != pool) {
pool.scan();
if (pool.isEmpty()) {
Expand Down
30 changes: 23 additions & 7 deletions src/main/java/com/alipay/remoting/rpc/RpcConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.TimeUnit;

import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
import org.slf4j.Logger;

import com.alipay.remoting.Connection;
Expand Down Expand Up @@ -132,9 +133,14 @@ protected void initChannel(SocketChannel channel) throws Exception {
public Connection createConnection(Url url) throws Exception {
ChannelFuture future = doCreateConnection(url.getIp(), url.getPort(),
url.getConnectTimeout());
Connection conn = new Connection(future.channel(),
ProtocolCode.fromBytes(url.getProtocol()), url.getVersion(), url);
future.channel().pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
Channel channel = future.channel();
Connection conn = new Connection(channel, ProtocolCode.fromBytes(url.getProtocol()),
url.getVersion(), url);
if (channel.isActive()) {
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
} else {
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT_FAILED);
}
return conn;
}

Expand All @@ -145,10 +151,15 @@ public Connection createConnection(Url url) throws Exception {
public Connection createConnection(String targetIP, int targetPort, int connectTimeout)
throws Exception {
ChannelFuture future = doCreateConnection(targetIP, targetPort, connectTimeout);
Connection conn = new Connection(future.channel(),
Channel channel = future.channel();
Connection conn = new Connection(channel,
ProtocolCode.fromBytes(RpcProtocol.PROTOCOL_CODE), RpcProtocolV2.PROTOCOL_VERSION_1,
new Url(targetIP, targetPort));
future.channel().pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
if (channel.isActive()) {
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
} else {
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT_FAILED);
}
return conn;
}

Expand All @@ -159,10 +170,15 @@ public Connection createConnection(String targetIP, int targetPort, int connectT
public Connection createConnection(String targetIP, int targetPort, byte version,
int connectTimeout) throws Exception {
ChannelFuture future = doCreateConnection(targetIP, targetPort, connectTimeout);
Connection conn = new Connection(future.channel(),
Channel channel = future.channel();
Connection conn = new Connection(channel,
ProtocolCode.fromBytes(RpcProtocolV2.PROTOCOL_CODE), version, new Url(targetIP,
targetPort));
future.channel().pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
if (channel.isActive()) {
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT);
} else {
channel.pipeline().fireUserEventTriggered(ConnectionEventType.CONNECT_FAILED);
}
return conn;
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/alipay/remoting/util/FutureTaskUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ public static <T> T getFutureTaskResult(RunStateRecordedFutureTask<T> task, Logg
} catch (ExecutionException e) {
logger.error("Future task execute failed!", e);
} catch (FutureTaskNotRunYetException e) {
logger.error("Future task has not run yet!", e);
logger.warn("Future task has not run yet!", e);
} catch (FutureTaskNotCompleted e) {
logger.error("Future task has not completed!", e);
logger.warn("Future task has not completed!", e);
}
}
return t;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.remoting.rpc.connectionmanage;

import com.alipay.remoting.Connection;
import com.alipay.remoting.ConnectionEventProcessor;
import com.alipay.remoting.ConnectionEventType;
import com.alipay.remoting.exception.RemotingException;
import com.alipay.remoting.rpc.RpcClient;
import com.alipay.remoting.rpc.common.BoltServer;
import com.alipay.remoting.rpc.common.CONNECTEventProcessor;
import org.junit.Assert;
import org.junit.Test;

/**
* @author chengyi ([email protected]) 2019-06-25 19:59
*/
public class ConnectionExceptionTest {

@Test
public void testConnectionException() throws RemotingException, InterruptedException {
CONNECTEventProcessor serverConnectProcessor = new CONNECTEventProcessor();

BoltServer boltServer = new BoltServer(1024);
boltServer.addConnectionEventProcessor(ConnectionEventType.CONNECT, serverConnectProcessor);
boltServer.start();

final String[] closedUrl = new String[1];
RpcClient client = new RpcClient();
client.enableReconnectSwitch();
client.addConnectionEventProcessor(ConnectionEventType.CLOSE,
new ConnectionEventProcessor() {
@Override
public void onEvent(String remoteAddr, Connection conn) {
closedUrl[0] = remoteAddr;
}
});
client.init();

Connection connection = client.getConnection("127.0.0.1:1024", 1000);
Thread.sleep(10);
Assert.assertEquals(1, serverConnectProcessor.getConnectTimes());

connection.getChannel().close();

Thread.sleep(100);
Assert.assertTrue("127.0.0.1:1024".equals(closedUrl[0]));

// connection has been created by ReconnectManager
Thread.sleep(1000 * 2);
Assert.assertEquals(2, serverConnectProcessor.getConnectTimes());
connection = client.getConnection("127.0.0.1:1024", 1000);
Assert.assertTrue(connection.isFine());
Assert.assertEquals(2, serverConnectProcessor.getConnectTimes());

boltServer.stop();
}
}

0 comments on commit d2fbaf0

Please sign in to comment.