Skip to content

Commit

Permalink
反向代理模式丢包修复
Browse files Browse the repository at this point in the history
  • Loading branch information
loveinsky100 committed Mar 31, 2020
1 parent cc11844 commit 952ced9
Show file tree
Hide file tree
Showing 18 changed files with 78,775 additions and 53 deletions.
18 changes: 18 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"configurations": [
{
"type": "java",
"name": "CodeLens (Launch) - Panama",
"request": "launch",
"mainClass": "org.leo.server.panama.vpn.Panama",
"projectName": "vpn"
},
{
"type": "java",
"name": "CodeLens (Launch) - ConfigurationReader",
"request": "launch",
"mainClass": "org.leo.server.panama.vpn.configuration.ConfigurationReader",
"projectName": "vpn"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import org.leo.server.panama.client.AbstractClient;
import org.leo.server.panama.client.ClientResponseDelegate;
import org.leo.server.panama.client.handler.TCPClientHandler;
Expand Down
78,273 changes: 78,273 additions & 0 deletions logs/error.log

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions panama.config
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
{
"password":"123456789",
"port":9899,
"mode":"normal",
"password":"1234567890",
"port":9898,
"proxyEqualsCurrent":true,
"proxyPort":0,
"reversePort":0,
"type":"aes-256-cfb"
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.leo.server.panama.server.tcp;

import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import org.leo.server.panama.core.handler.RequestHandler;
import org.leo.server.panama.core.handler.tcp.TCPRequestHandler;
import org.leo.server.panama.server.AbstractServer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,22 @@
@ComponentScan("org.leo.server.panama.test.server.function.*")
public class ServerTest {
public static void main(String []args) {
// load panama spring config
Panama.load(args);

// connect: http://localhost:8080
// client send: http://localhost:8080/hello?name=Leo
// server response: Leo
Panama.startHttpServer(10, 8080);

// connect: telnet localhost 8081
// client send: hello?name=Leo
// server response: Leo
Panama.startTcpServer(10, 8081);

// connect: ws://localhost:8082/ws?conn=1
// client send message: hello?name=ANY_STRING
// server response: ANY_STRING
Panama.startWebSocketServer(10, 8082);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ public class ShadowSocksConfiguration {
*/
private String mode;

/**
* encrypt: 加密 raw: 不加密
* 加密信息: encrypt / raw
*/
private String encrypt;

/**
* 加密类型
*/
Expand Down Expand Up @@ -59,6 +65,18 @@ public class ShadowSocksConfiguration {
*/
private int reversePort;


public String getEncrypt() {
if (null == encrypt) {
return "encrypt";
}
return encrypt;
}

public void setEncrypt(String encrypt) {
this.encrypt = encrypt;
}

public String getType() {
if (null == type) {
return "aes-256-cfb";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public abstract class AbstractShadowSocksProxy implements ClientResponseDelegate

public AbstractShadowSocksProxy(Channel clientChannel, Callback finish, ShadowSocksConfiguration shadowSocksConfiguration, NioEventLoopGroup eventLoopGroup, ShadowsocksRequestResolver requestResolver) {
this.clientChannel = clientChannel;
wrapper = WrapperFactory.getInstance(shadowSocksConfiguration.getType(), shadowSocksConfiguration.getPassword(), "encrypt");
wrapper = WrapperFactory.getInstance(shadowSocksConfiguration.getType(), shadowSocksConfiguration.getPassword(), shadowSocksConfiguration.getEncrypt());
this.finish = finish;
this.eventLoopGroup = eventLoopGroup;
this.requestResolver = requestResolver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
import org.leo.server.panama.vpn.configuration.ShadowSocksConfiguration;
import org.leo.server.panama.vpn.constant.VPNConstant;
import org.leo.server.panama.vpn.proxy.TCPProxy;
import org.leo.server.panama.vpn.proxy.impl.AgentShadowSocksProxy;
import org.leo.server.panama.vpn.proxy.impl.Redirect2ReverseShadowSocksProxy;
import org.leo.server.panama.vpn.proxy.impl.ReverseShadowSocksProxy;
import org.leo.server.panama.vpn.proxy.impl.ShadowSocksProxy;
import org.leo.server.panama.vpn.proxy.impl.*;
import org.leo.server.panama.vpn.reverse.core.ReverseCoreServer;
import org.leo.server.panama.vpn.shadowsocks.ShadowsocksRequestResolver;
import org.leo.server.panama.vpn.util.Callback;
Expand Down Expand Up @@ -38,8 +35,18 @@ public static void createReverseServer(ShadowSocksConfiguration shadowSocksConfi
}).start();
}

public static TCPProxy createRePlayShadowSocksProxy(Channel channel, Callback callback, ShadowSocksConfiguration shadowSocksConfiguration) {
// 测试代理,测试用
return new RePlayShadowSocksProxy(
channel,
callback,
shadowSocksConfiguration,
eventLoopGroup,
requestResolver);
}

public static TCPProxy createReverseShadowSocksProxy(Channel channel, Callback callback, ShadowSocksConfiguration shadowSocksConfiguration) {
// 反响代理TCP服务
// 反向代理TCP服务
return new ReverseShadowSocksProxy(
channel,
callback,
Expand All @@ -49,7 +56,7 @@ public static TCPProxy createReverseShadowSocksProxy(Channel channel, Callback c
}

public static TCPProxy createRedirect2ReverseShadowSocksProxy(Channel channel, Callback callback, ShadowSocksConfiguration shadowSocksConfiguration) {
// 反响代理TCP服务
// 反向代理TCP服务
return new Redirect2ReverseShadowSocksProxy(
channel,
callback,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public AgentShadowSocksProxy(Channel clientChannel,
NioEventLoopGroup eventLoopGroup,
ShadowsocksRequestResolver requestResolver) {
super(clientChannel, finish, shadowSocksConfiguration, eventLoopGroup, requestResolver);
agentWrapper = WrapperFactory.getInstance(shadowSocksConfiguration.getProxyType(), shadowSocksConfiguration.getProxyPassword(), "encrypt");
agentWrapper = WrapperFactory.getInstance(shadowSocksConfiguration.getProxyType(), shadowSocksConfiguration.getProxyPassword(), shadowSocksConfiguration.getEncrypt());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.leo.server.panama.vpn.proxy.impl;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import org.apache.log4j.Logger;
import org.leo.server.panama.client.Client;
import org.leo.server.panama.vpn.configuration.ShadowSocksConfiguration;
import org.leo.server.panama.vpn.shadowsocks.ShadowsocksRequestResolver;
import org.leo.server.panama.vpn.util.Callback;

import java.util.function.Function;

/**
* 收到后重新返回给发送方,测试用
* @author xuyangze
* @date 2018/11/20 8:13 PM
*/
public class RePlayShadowSocksProxy extends ShadowSocksProxy {
private final static Logger log = Logger.getLogger(RePlayShadowSocksProxy.class);
private Function<byte[], ByteBuf> appendTagFunc;

public RePlayShadowSocksProxy(Channel clientChannel,
Callback finish,
ShadowSocksConfiguration shadowSocksConfiguration,
NioEventLoopGroup eventLoopGroup,
ShadowsocksRequestResolver requestResolver) {
super(clientChannel, finish, shadowSocksConfiguration, eventLoopGroup, requestResolver);
}

@Override
protected void send2Client(byte[] data) {
data = wrapper.wrap(data);

// 返回的结果会添加tag标记,此tag为代理请求的tag
clientChannel.write(appendTagFunc.apply(data));
clientChannel.flush();
log.info("client <---------------- proxy " + data.length + " byte");
}

@Override
public void onConnectClosed(Client client) {
// send close data
log.info("client <---------------- proxy closed");
if (null != finish) {
try {
finish.call();
} catch (Exception e) {
//
}
}
}

public Function<byte[], ByteBuf> getAppendTagFunc() {
return appendTagFunc;
}

public void setAppendTagFunc(Function<byte[], ByteBuf> appendTagFunc) {
this.appendTagFunc = appendTagFunc;
}

public void closeTargetConnection() {
if (null != this.redirectClient) {
this.redirectClient.close();
}
}

@Override
public void doProxy(byte[] data) {
log.info("client ----------------> proxy " + data.length + " byte, data is :" + new String(data));
send2Client(data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ public Redirect2ReverseShadowSocksProxy(Channel clientChannel,
this.reverseCoreServer = reverseCoreServer;
}

@Override
public boolean shouldDoPerResponse() {
return false;
}

@Override
public boolean shouldDoCompleteResponse() {
return true;
}
// @Override
// public boolean shouldDoPerResponse() {
// return true;
// }
//
// @Override
// public boolean shouldDoCompleteResponse() {
// return true;
// }

@Override
public void doProxy(byte []data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
import org.leo.server.panama.util.NumberUtils;
import org.leo.server.panama.vpn.reverse.constant.ReverseConstants;
import org.leo.server.panama.vpn.reverse.core.ReverseCoreServer;
import org.leo.server.panama.vpn.util.MD5;

import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 反向代理客户端,本质上是一个服务端
Expand All @@ -21,12 +24,13 @@ public class ReverseTCPClient implements Client {
private ReverseCoreServer reverseCoreServer;
private ClientResponseDelegate<TCPResponse> delegate;
private volatile boolean closed = true;
private static int tag = 10000;
private static AtomicInteger TAG = new AtomicInteger(10000);
private int tag = 0;

public ReverseTCPClient(ClientResponseDelegate<TCPResponse> delegate, ReverseCoreServer reverseCoreServer) {
this.reverseCoreServer = reverseCoreServer;
this.delegate = delegate;
tag ++;
this.tag = TAG.incrementAndGet();
}

@Override
Expand All @@ -38,11 +42,10 @@ public Client connect(InetSocketAddress inetSocketAddress) {
@Override
public void send(byte[] data, int timeout) {
if (isClose()) {
System.out.println("send when closed for tag: " + tag);
return;
}

reverseCoreServer.send2Client(tag, data, responseData -> {
reverseCoreServer.send2Client(this.tag, data, responseData -> {
TCPResponse response = new TCPResponse(responseData);
delegate.doPerResponse(ReverseTCPClient.this, response);
}, () -> this.close());
Expand All @@ -66,6 +69,6 @@ public void close() {

this.closed = true;
delegate.onConnectClosed(ReverseTCPClient.this);
reverseCoreServer.send2Client(tag, NumberUtils.intToByteArray(ReverseConstants.CLOSE_MAGIC), null, null);
reverseCoreServer.send2Client(this.tag, NumberUtils.intToByteArray(ReverseConstants.CLOSE_MAGIC), null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.leo.server.panama.vpn.reverse.protocol.ReverseProtocol;
import org.leo.server.panama.vpn.util.Callback;
import org.leo.server.panama.vpn.util.LocalCacheFactory;
import org.leo.server.panama.vpn.util.MD5;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -22,9 +23,9 @@
import java.util.function.Supplier;

/**
* 反向代理服务端,实际上是作为客户端存在内网服务器中
* 想代理服务器发起一条长连接,接收代码服务器的请求
* 收到代理服务器请求后发起网络请求,然后回调给代理服务器
* 反向代理服务端,作为服务端存在外网服务器中
* 内网客户端连接到此端口,并维持一条稳定的连接
* 代理服务器请求后发起网络请求,然后回调给代理服务器
* 由于复用一条连接,所有的请求带上tag标记,判断是否属于同一个请求
* @author xuyangze
* @date 2018/11/21 3:17 PM
Expand All @@ -37,6 +38,8 @@ public class ReverseCoreServer extends TCPServer implements RequestHandler<TCPRe
private Cache<Integer, Consumer<byte []>> tag2ConsumerMap = LocalCacheFactory.createCache(60 * 1000 * 5, 20000);
private Cache<Integer, Callback> tag2ClosedMap = LocalCacheFactory.createCache(60 * 1000 * 5, 20000);

private ReverseProtocol.ReverseProtocolData lastUnCompleteReverseProtocolData;

public ReverseCoreServer(int port) {
super(port);
}
Expand Down Expand Up @@ -75,14 +78,15 @@ public void onConnect(ChannelHandlerContext ctx) {

@Override
public void doRequest(TCPRequest request) {
List<ReverseProtocol.ReverseProtocolData> reverseProtocolDatas = ReverseProtocol.decodeProtocol(request.getData());
List<ReverseProtocol.ReverseProtocolData> reverseProtocolDatas = ReverseProtocol.decodeProtocol(request.getData(), lastUnCompleteReverseProtocolData);
lastUnCompleteReverseProtocolData = null;

if (null == reverseProtocolDatas || reverseProtocolDatas.size() == 0) {
log.error("reverseProtocolDatas is empty, but data size is: " + request.getData().length);
return;
}

reverseProtocolDatas.forEach(reverseProtocolData -> this.doRequest(reverseProtocolData.getTag(), reverseProtocolData.getData()));
reverseProtocolDatas.forEach(reverseProtocolData -> this.doRequest(reverseProtocolData));
}

@Override
Expand All @@ -92,11 +96,19 @@ public void onClose(ChannelHandlerContext ctx) {
callback.call();
}

lastUnCompleteReverseProtocolData = null;
tag2ClosedMap.invalidateAll();
tag2ConsumerMap.invalidateAll();
}

private void doRequest(int tag, byte[] data) {
private void doRequest(ReverseProtocol.ReverseProtocolData reverseProtocolData) {
if (!reverseProtocolData.isComplete()) {
lastUnCompleteReverseProtocolData = reverseProtocolData;
return;
}

int tag = reverseProtocolData.getTag();
byte[] data = reverseProtocolData.getData();
Consumer<byte []> consumer = tag2ConsumerMap.getIfPresent(tag);
if (null != consumer) {
if (data.length != 4) {
Expand All @@ -110,6 +122,8 @@ private void doRequest(int tag, byte[] data) {
tag2ClosedMap.invalidate(tag);
callback.call();
}
} else {
consumer.accept(data);
}
}
}
Expand Down
Loading

0 comments on commit 952ced9

Please sign in to comment.