Skip to content
This repository has been archived by the owner on May 10, 2022. It is now read-only.

Commit

Permalink
fix: close session when unable to connect (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wu Tao authored and foreverneverer committed Jan 6, 2020
1 parent 8c373cb commit 43636af
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,15 @@ public int asyncSend(client_operator op, Runnable callbackFunc, long timeoutInMi
if (cache.state == ConnState.CONNECTED) {
write(entry, cache);
} else {
boolean needConnect = false;
synchronized (pendingSend) {
cache = fields;
if (cache.state == ConnState.CONNECTED) {
write(entry, cache);
} else {
pendingSend.offer(entry);
if (cache.state == ConnState.DISCONNECTED) {
cache = new VolatileFields();
cache.state = ConnState.CONNECTING;
fields = cache;
needConnect = true;
}
}
}
if (needConnect) {
doConnect();
}
tryConnect();
}
return entry.sequenceId;
}
Expand All @@ -132,8 +123,17 @@ public void closeSession() {
} catch (Exception ex) {
logger.warn("close channel {} failed: ", address.toString(), ex);
}
} else if (f.state == ConnState.CONNECTING) { // f.nettyChannel == null
// If our actively-close strategy fails to reconnect the session due to
// some sort of deadlock, close this session and retry.
logger.info("{}: close a connecting session", name());
markSessionDisconnect();
} else {
logger.info("channel {} not connected, skip the close", address.toString());
logger.info(
"{}: session is not connected [state={}, nettyChannel{}=null], skip the close",
name(),
f.state,
f.nettyChannel == null ? "=" : "!");
}
}

Expand All @@ -149,7 +149,34 @@ public final rpc_address getAddress() {
return address;
}

ChannelFuture doConnect() {
@Override
public String toString() {
return address.toString();
}

/**
* Connects to remote host if it is currently disconnected.
*
* @return a nullable ChannelFuture.
*/
public ChannelFuture tryConnect() {
boolean needConnect = false;
synchronized (pendingSend) {
if (fields.state == ConnState.DISCONNECTED) {
VolatileFields cache = new VolatileFields();
cache.state = ConnState.CONNECTING;
fields = cache;
needConnect = true;
}
}
if (needConnect) {
logger.info("{}: the session is disconnected, needs to reconnect", name());
return doConnect();
}
return null;
}

private ChannelFuture doConnect() {
try {
// we will receive the channel connect event in DefaultHandler.ChannelActive
return boot.connect(address.get_ip(), address.get_port())
Expand Down Expand Up @@ -180,6 +207,12 @@ private void markSessionConnected(Channel activeChannel) {
newCache.nettyChannel = activeChannel;

synchronized (pendingSend) {
if (fields.state != ConnState.CONNECTING) {
// this session may have been closed or connected already
logger.info("{}: session is {}, skip to mark it connected", name(), fields.state);
return;
}

while (!pendingSend.isEmpty()) {
RequestEntry e = pendingSend.poll();
if (pendingResponse.get(e.sequenceId) != null) {
Expand Down Expand Up @@ -220,6 +253,7 @@ void markSessionDisconnect() {
cache.state.toString(),
e);
} finally {
logger.info("{}: mark the session to be disconnected from state={}", name(), cache.state);
// ensure the state must be set DISCONNECTED
cache = new VolatileFields();
cache.state = ConnState.DISCONNECTED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ void initTableConfiguration(query_cfg_response resp) {
newConfig.replicas.add(newReplicaConfig);
}

FutureGroup futureGroup = new FutureGroup(resp.getPartition_count());
FutureGroup<Void> futureGroup = new FutureGroup<>(resp.getPartition_count());
for (partition_configuration pc : resp.getPartitions()) {
ReplicaConfiguration s = newConfig.replicas.get(pc.getPid().get_pidx());
if (s.ballot != pc.ballot) {
Expand Down Expand Up @@ -177,7 +177,7 @@ void initTableConfiguration(query_cfg_response resp) {
if (s.session == null || !s.session.getAddress().equals(pc.primary)) {
// reset to new primary
s.session = manager_.getReplicaSession(pc.primary);
ChannelFuture fut = s.session.doConnect();
ChannelFuture fut = s.session.tryConnect();
if (fut != null) {
futureGroup.add(fut);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void testDNSResolveHost() throws Exception {
MetaSession session = manager.getMetaSession();
MetaSession meta = Mockito.spy(session);
ReplicaSession meta2 = meta.getMetaList().get(0); // 127.0.0.1:34602
meta2.doConnect();
meta2.tryConnect();
while (meta2.getState() != ReplicaSession.ConnState.CONNECTED) {
Thread.sleep(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.xiaomi.infra.pegasus.thrift.protocol.TProtocol;
import com.xiaomi.infra.pegasus.tools.Toollet;
import com.xiaomi.infra.pegasus.tools.Tools;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -31,12 +33,6 @@
import org.mockito.Mockito;
import org.slf4j.Logger;

/**
* ReplicaSession Tester.
*
* @author [email protected]
* @version 1.0
*/
public class ReplicaSessionTest {
private String[] metaList = {"127.0.0.1:34601", "127.0.0.1:34602", "127.0.0.1:34603"};
private final Logger logger = org.slf4j.LoggerFactory.getLogger(ReplicaSessionTest.class);
Expand Down Expand Up @@ -255,4 +251,50 @@ public void testTryNotifyWithSequenceID() throws Exception {
mockRs.markSessionDisconnect();
Assert.assertEquals(mockRs.getState(), ConnState.DISCONNECTED);
}

@Test
public void testCloseSession() throws InterruptedException {
rpc_address addr = new rpc_address();
addr.fromString("127.0.0.1:34801");
ReplicaSession rs = manager.getReplicaSession(addr);
rs.tryConnect().awaitUninterruptibly();
Thread.sleep(200);
Assert.assertEquals(rs.getState(), ConnState.CONNECTED);

rs.closeSession();
Thread.sleep(100);
Assert.assertEquals(rs.getState(), ConnState.DISCONNECTED);

rs.fields.state = ConnState.CONNECTING;
rs.closeSession();
Thread.sleep(100);
Assert.assertEquals(rs.getState(), ConnState.DISCONNECTED);
}

@Test
public void testSessionConnectTimeout() throws InterruptedException {
rpc_address addr = new rpc_address();
addr.fromString(
"www.baidu.com:34801"); // this website normally ignores incorrect request without replying

long start = System.currentTimeMillis();
EventLoopGroup rpcGroup = new NioEventLoopGroup(4);
ReplicaSession rs = new ReplicaSession(addr, rpcGroup, 1000);
rs.tryConnect().awaitUninterruptibly();
long end = System.currentTimeMillis();
Assert.assertEquals((end - start) / 1000, 1); // ensure connect failed within 1sec
Thread.sleep(100);
Assert.assertEquals(rs.getState(), ConnState.DISCONNECTED);
}

@Test
public void testSessionTryConnectWhenConnected() throws InterruptedException {
rpc_address addr = new rpc_address();
addr.fromString("127.0.0.1:34801");
ReplicaSession rs = manager.getReplicaSession(addr);
rs.tryConnect().awaitUninterruptibly();
Thread.sleep(100);
Assert.assertEquals(rs.getState(), ConnState.CONNECTED);
Assert.assertNull(rs.tryConnect()); // do not connect again
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public void testConnectAfterQueryMeta() throws Exception {
}
Assert.assertNotNull(table);

Thread.sleep(100);
ArrayList<ReplicaConfiguration> replicas = table.tableConfig_.get().replicas;
for (ReplicaConfiguration r : replicas) {
Assert.assertEquals(r.session.getState(), ReplicaSession.ConnState.CONNECTED);
Expand Down

0 comments on commit 43636af

Please sign in to comment.