Skip to content

Commit

Permalink
Release/1.6.0 (#160)
Browse files Browse the repository at this point in the history
* Add LifeCycle component and refactor some code. (#113)

* Make getAfterRun un-block avoid deadlock. (#107) (#109)

* update readme

* update version to 1.6.0-SNAPSHOT

* 修改RpcClientDemoByMain 和RpcServerDemoByMain 的logger引用对象 (#112)

* 修改日志引用的对象

* add lifecycle component

* fix NPE in ReconnectManager and refactor some code

* Refactor some components with LifeCycle interface. (#114)

Refactor some components with LifeCycle interface.

* Refactor RpcClient with LifeCycle interface and add option module. (#116)

Refactor RpcClient with LifeCycle interface and add option module.

* fix #131 (#132)


* refactor DefaultConnectionManager to fix issue: #131

* refactory ConnectionManager to support start/shutdown operations (#135)

* 重构ScheduledDisconnectStrategy实现 (#136)

* refactory ConnectionManager to support start/shutdown operations

* Reconnector class is used instead of ReconnectManager class

* refactor ScheduledDisconnectStrategy

* Support config strategy (#138)

* support config user ConnectionSelectStrategy impl

* add LifeCycle interface to RemotingServer

* fix some code according to CR

* fix #137 (#141)

* update version 1.6.0-SNAPSHOT->1.6.0
  • Loading branch information
dbl-x authored May 31, 2019
1 parent 124fbf4 commit 088df26
Show file tree
Hide file tree
Showing 51 changed files with 2,186 additions and 1,292 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
<version>1.5.5</version>
<version>1.6.0</version>
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down Expand Up @@ -66,7 +66,7 @@
<properties>
<cobertura.maven.plugin>2.6</cobertura.maven.plugin>
<coveralls.maven.plugin>3.2.1</coveralls.maven.plugin>
<hessian.version>3.3.2</hessian.version>
<hessian.version>3.3.6</hessian.version>
<java.version>1.6</java.version>
<license.maven.plugin>3.0</license.maven.plugin>
<maven.assembly.plugin>3.0.0</maven.assembly.plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,46 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.remoting.config;
package com.alipay.remoting;

import com.alipay.remoting.config.BoltOption;
import com.alipay.remoting.config.BoltOptions;
import com.alipay.remoting.config.ConfigManager;
import com.alipay.remoting.config.Configurable;
import com.alipay.remoting.config.ConfigurableInstance;
import com.alipay.remoting.config.configs.ConfigContainer;
import com.alipay.remoting.config.configs.ConfigItem;
import com.alipay.remoting.config.configs.ConfigType;
import com.alipay.remoting.config.configs.DefaultConfigContainer;
import com.alipay.remoting.config.switches.GlobalSwitch;

/**
* common implementation for a configurable instance
*
* @author tsui
* @version $Id: AbstractConfigurableInstance.java, v 0.1 2018-07-30 21:11 tsui Exp $$
* @author chengyi ([email protected]) 2018-11-07 15:22
*/
public class AbstractConfigurableInstance implements ConfigurableInstance {
private ConfigContainer configContainer = new DefaultConfigContainer();
private GlobalSwitch globalSwitch = new GlobalSwitch();
private ConfigType configType;
public abstract class AbstractBoltClient extends AbstractLifeCycle implements BoltClient,
ConfigurableInstance {

private final BoltOptions options;
private final ConfigType configType;
private final GlobalSwitch globalSwitch;
private final ConfigContainer configContainer;

public AbstractBoltClient() {
this.options = new BoltOptions();
this.configType = ConfigType.CLIENT_SIDE;
this.globalSwitch = new GlobalSwitch();
this.configContainer = new DefaultConfigContainer();
}

protected AbstractConfigurableInstance(ConfigType configType) {
this.configType = configType;
@Override
public <T> T option(BoltOption<T> option) {
return options.option(option);
}

@Override
public <T> Configurable option(BoltOption<T> option, T value) {
options.option(option, value);
return this;
}

@Override
Expand All @@ -55,23 +74,21 @@ public void initWriteBufferWaterMark(int low, int high) {

@Override
public int netty_buffer_low_watermark() {
if (null != configContainer
&& configContainer.contains(configType, ConfigItem.NETTY_BUFFER_LOW_WATER_MARK)) {
return (Integer) configContainer
.get(configType, ConfigItem.NETTY_BUFFER_LOW_WATER_MARK);
Object config = configContainer.get(configType, ConfigItem.NETTY_BUFFER_LOW_WATER_MARK);
if (config != null) {
return (Integer) config;
} else {
return ConfigManager.netty_buffer_low_watermark();
}
}

@Override
public int netty_buffer_high_watermark() {
if (null != configContainer
&& configContainer.contains(configType, ConfigItem.NETTY_BUFFER_HIGH_WATER_MARK)) {
return (Integer) configContainer.get(configType,
ConfigItem.NETTY_BUFFER_HIGH_WATER_MARK);
Object config = configContainer.get(configType, ConfigItem.NETTY_BUFFER_HIGH_WATER_MARK);
if (config != null) {
return (Integer) config;
} else {
return ConfigManager.netty_buffer_high_watermark();
}
}
}
}
50 changes: 50 additions & 0 deletions src/main/java/com/alipay/remoting/AbstractLifeCycle.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.remoting;

/**
* @author chengyi ([email protected]) 2018-11-05 14:43
*/
public abstract class AbstractLifeCycle implements LifeCycle {

private volatile boolean isStarted = false;

@Override
public void startup() throws LifeCycleException {
if (!isStarted) {
isStarted = true;
return;
}

throw new LifeCycleException("this component has started");
}

@Override
public void shutdown() throws LifeCycleException {
if (isStarted) {
isStarted = false;
return;
}

throw new LifeCycleException("this component has closed");
}

@Override
public boolean isStarted() {
return isStarted;
}
}
21 changes: 9 additions & 12 deletions src/main/java/com/alipay/remoting/AbstractRemotingProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,17 @@ public AbstractRemotingProcessor(CommandFactory commandFactory) {

/**
* Constructor.
* @param executor
* @param executor ExecutorService
*/
public AbstractRemotingProcessor(ExecutorService executor) {
this.executor = executor;
}

/**
* Constructor.
* @param executor
*
* @param commandFactory CommandFactory
* @param executor ExecutorService
*/
public AbstractRemotingProcessor(CommandFactory commandFactory, ExecutorService executor) {
this.commandFactory = commandFactory;
Expand All @@ -70,19 +72,17 @@ public AbstractRemotingProcessor(CommandFactory commandFactory, ExecutorService
/**
* Do the process.
*
* @param ctx
* @param msg
* @throws Exception
* @param ctx RemotingContext
* @param msg T
*/
public abstract void doProcess(RemotingContext ctx, T msg) throws Exception;

/**
* Process the remoting command with its own executor or with the defaultExecutor if its own if null.
*
* @param ctx
* @param msg
* @param defaultExecutor
* @throws Exception
* @param ctx RemotingContext
* @param msg T
* @param defaultExecutor ExecutorService, default executor
*/
@Override
public void process(RemotingContext ctx, T msg, ExecutorService defaultExecutor)
Expand Down Expand Up @@ -139,9 +139,6 @@ public ProcessTask(RemotingContext ctx, T msg) {
this.msg = msg;
}

/**
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
try {
Expand Down
136 changes: 103 additions & 33 deletions src/main/java/com/alipay/remoting/AbstractRemotingServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@
package com.alipay.remoting;

import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;

import com.alipay.remoting.config.BoltOption;
import com.alipay.remoting.config.BoltOptions;
import com.alipay.remoting.config.ConfigManager;
import com.alipay.remoting.config.Configurable;
import com.alipay.remoting.config.ConfigurableInstance;
import com.alipay.remoting.config.configs.ConfigContainer;
import com.alipay.remoting.config.configs.ConfigItem;
import com.alipay.remoting.config.configs.DefaultConfigContainer;
import com.alipay.remoting.config.switches.GlobalSwitch;
import org.slf4j.Logger;

import com.alipay.remoting.config.AbstractConfigurableInstance;
import com.alipay.remoting.config.configs.ConfigType;
import com.alipay.remoting.log.BoltLoggerFactory;

Expand All @@ -31,61 +38,78 @@
* @author jiangping
* @version $Id: AbstractRemotingServer.java, v 0.1 2015-9-5 PM7:37:48 tao Exp $
*/
public abstract class AbstractRemotingServer extends AbstractConfigurableInstance implements
RemotingServer {
public abstract class AbstractRemotingServer extends AbstractLifeCycle implements RemotingServer,
ConfigurableInstance {

private static final Logger logger = BoltLoggerFactory.getLogger("CommonDefault");
private static final Logger logger = BoltLoggerFactory.getLogger("CommonDefault");

private AtomicBoolean started = new AtomicBoolean(false);
private String ip;
private int port;
private String ip;
private int port;

private final BoltOptions options;
private final ConfigType configType;
private final GlobalSwitch globalSwitch;
private final ConfigContainer configContainer;

public AbstractRemotingServer(int port) {
this(new InetSocketAddress(port).getAddress().getHostAddress(), port);
}

public AbstractRemotingServer(String ip, int port) {
super(ConfigType.SERVER_SIDE);
this.ip = ip;
this.port = port;

this.options = new BoltOptions();
this.configType = ConfigType.SERVER_SIDE;
this.globalSwitch = new GlobalSwitch();
this.configContainer = new DefaultConfigContainer();
}

@Override
@Deprecated
public void init() {
// Do not call this method, it will be removed in the next version
}

@Override
@Deprecated
public boolean start() {
if (started.compareAndSet(false, true)) {
try {
doInit();

logger.warn("Prepare to start server on port {} ", port);
if (doStart()) {
logger.warn("Server started on port {}", port);
return true;
} else {
logger.warn("Failed starting server on port {}", port);
return false;
}
} catch (Throwable t) {
this.stop();// do stop to ensure close resources created during doInit()
throw new IllegalStateException("ERROR: Failed to start the Server!", t);
startup();
return true;
}

@Override
@Deprecated
public boolean stop() {
shutdown();
return true;
}

@Override
public void startup() throws LifeCycleException {
super.startup();

try {
doInit();

logger.warn("Prepare to start server on port {} ", port);
if (doStart()) {
logger.warn("Server started on port {}", port);
} else {
logger.warn("Failed starting server on port {}", port);
throw new LifeCycleException("Failed starting server on port: " + port);
}
} else {
String errMsg = "ERROR: The server has already started!";
logger.error(errMsg);
throw new IllegalStateException(errMsg);
} catch (Throwable t) {
this.shutdown();// do stop to ensure close resources created during doInit()
throw new IllegalStateException("ERROR: Failed to start the Server!", t);
}
}

@Override
public boolean stop() {
if (started.compareAndSet(true, false)) {
return this.doStop();
} else {
throw new IllegalStateException("ERROR: The server has already stopped!");
public void shutdown() throws LifeCycleException {
super.shutdown();
if (!doStop()) {
throw new LifeCycleException("doStop fail");
}
}

Expand All @@ -105,4 +129,50 @@ public int port() {

protected abstract boolean doStop();

@Override
public <T> T option(BoltOption<T> option) {
return options.option(option);
}

@Override
public <T> Configurable option(BoltOption<T> option, T value) {
options.option(option, value);
return this;
}

@Override
public ConfigContainer conf() {
return this.configContainer;
}

@Override
public GlobalSwitch switches() {
return this.globalSwitch;
}

@Override
public void initWriteBufferWaterMark(int low, int high) {
this.configContainer.set(configType, ConfigItem.NETTY_BUFFER_LOW_WATER_MARK, low);
this.configContainer.set(configType, ConfigItem.NETTY_BUFFER_HIGH_WATER_MARK, high);
}

@Override
public int netty_buffer_low_watermark() {
Object config = configContainer.get(configType, ConfigItem.NETTY_BUFFER_LOW_WATER_MARK);
if (config != null) {
return (Integer) config;
} else {
return ConfigManager.netty_buffer_low_watermark();
}
}

@Override
public int netty_buffer_high_watermark() {
Object config = configContainer.get(configType, ConfigItem.NETTY_BUFFER_HIGH_WATER_MARK);
if (config != null) {
return (Integer) config;
} else {
return ConfigManager.netty_buffer_high_watermark();
}
}
}
Loading

0 comments on commit 088df26

Please sign in to comment.