Skip to content

Commit

Permalink
添加对session的管理
Browse files Browse the repository at this point in the history
  • Loading branch information
[moon] committed Jul 18, 2022
1 parent 44446db commit 95fde6c
Show file tree
Hide file tree
Showing 9 changed files with 189 additions and 10 deletions.
46 changes: 44 additions & 2 deletions README-CN.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@

<div align="center">
<img align="center" src="https://jdk.plus/img/dog.png" alt="drawing" style="width:100%;"/>
</div>
<h3 align="center">这是一款使用netty编写的springboot websocket组件。</h3>
<p align="center">
<a href="https://github.com/JDK-Plus/spring-boot-starter-websocket/blob/master/LICENSE"><img src="https://img.shields.io/github/license/JDK-Plus/spring-boot-starter-websocket.svg" /></a>
Expand All @@ -17,7 +19,7 @@
<dependency>
<groupId>plus.jdk</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>1.0.1</version>
<version>1.0.2</version>
</dependency>
```
## 配置
Expand Down Expand Up @@ -197,3 +199,43 @@ public class DemoHandler {
}
```

### 使用websocket连接主动向用户客户端推送消息

```java
package plus.jdk.broadcast.test.controller;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import plus.jdk.websocket.global.SessionGroupManager;
import plus.jdk.websocket.model.IWsSession;

import javax.annotation.Resource;
import java.util.concurrent.ConcurrentLinkedDeque;


@RestController
public class MessageController {

/**
* 该bean实例已经在底层封装好
*/
@Resource
private SessionGroupManager sessionGroupManager;

@RequestMapping(value = "/message/send", method = {RequestMethod.GET})
public Object sendMessage(@RequestParam String uid, @RequestParam String content){

// 调用sessionGroupManager.getSession()函数获取当前用户在该实例中的所有连接
// 你可以在 IWSSessionAuthenticator 的实现中自行实现自己的session定义,将消息分发给不同的设备
// 或向远端上报当前用户的连接到底在哪些机器上
ConcurrentLinkedDeque<IWsSession<?>> sessions = sessionGroupManager.getSession(uid, "/ws/message");
for(IWsSession<?> wsSession: sessions) {
wsSession.sendText(content); // 发送文本消息
wsSession.sendBinary(content.getBytes()); // 发送二进制消息
}
return "success";
}
}
```
45 changes: 44 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
<div align="center">
<img align="center" src="https://jdk.plus/img/dog.png" alt="drawing" style="width:100%;"/>
</div>
<h3 align="center">A springboot websocket component written using netty。</h3>
<p align="center">
<a href="https://github.com/JDK-Plus/spring-boot-starter-websocket/blob/master/LICENSE"><img src="https://img.shields.io/github/license/JDK-Plus/spring-boot-starter-websocket.svg" /></a>
Expand All @@ -14,7 +17,7 @@
<dependency>
<groupId>plus.jdk</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>1.0.1</version>
<version>1.0.2</version>
</dependency>
```

Expand Down Expand Up @@ -195,3 +198,43 @@ public class DemoHandler {
}
```

### Actively push messages to user clients using websocket connections

```java
package plus.jdk.broadcast.test.controller;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import plus.jdk.websocket.global.SessionGroupManager;
import plus.jdk.websocket.model.IWsSession;

import javax.annotation.Resource;
import java.util.concurrent.ConcurrentLinkedDeque;


@RestController
public class MessageController {

/**
* The bean instance is already encapsulated at the bottom
*/
@Resource
private SessionGroupManager sessionGroupManager;

@RequestMapping(value = "/message/send", method = {RequestMethod.GET})
public Object sendMessage(@RequestParam String uid, @RequestParam String content){

// Call the sessionGroupManager.getSession() function to get all connections of the current user in this instance.
// You can implement your own session definition in the implementation of IWSSessionAuthenticator to distribute messages to different devices
// Or report to the remote end which machines the current user is connected to
ConcurrentLinkedDeque<IWsSession<?>> sessions = sessionGroupManager.getSession(uid, "/ws/message");
for(IWsSession<?> wsSession: sessions) {
wsSession.sendText(content); // 发送文本消息
wsSession.sendBinary(content.getBytes()); // 发送二进制消息
}
return "success";
}
}
```
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>plus.jdk</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>1.0.1</version>
<version>1.0.2</version>

<name>spring-boot-starter-websocket</name>
<description>A simple websocket component base on netty</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.springframework.web.context.support.WebApplicationObjectSupport;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import plus.jdk.websocket.global.ServerEndpointExporter;
import plus.jdk.websocket.global.SessionGroupManager;
import plus.jdk.websocket.global.WebsocketDispatcher;
import plus.jdk.websocket.annotations.*;
import plus.jdk.websocket.properties.WebsocketProperties;
Expand All @@ -34,6 +35,11 @@ public class WebsocketAutoConfiguration extends WebApplicationObjectSupport impl
public WebsocketAutoConfiguration(WebsocketProperties properties) {
}

@Bean
public SessionGroupManager SessionGroupManager() {
return new SessionGroupManager();
}

@Bean
public WebsocketDispatcher WebsocketDispatcher(WebsocketProperties properties){
return new WebsocketDispatcher(properties, beanFactory);
Expand Down
11 changes: 8 additions & 3 deletions src/main/java/plus/jdk/websocket/global/HttpServerHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;
import org.springframework.beans.TypeMismatchException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.util.StringUtils;
import plus.jdk.websocket.common.WebsocketCommonException;
import plus.jdk.websocket.properties.WebsocketProperties;
Expand All @@ -31,10 +32,14 @@ public class HttpServerHandler extends SimpleChannelInboundHandler<FullHttpReque

private final NioEventLoopGroup eventLoopGroup;

public HttpServerHandler(WebsocketProperties properties, WebsocketDispatcher websocketDispatcher, NioEventLoopGroup worker) {
private final BeanFactory beanFactory;

public HttpServerHandler(WebsocketProperties properties, WebsocketDispatcher websocketDispatcher,
NioEventLoopGroup worker, BeanFactory beanFactory) {
this.properties = properties;
this.websocketDispatcher = websocketDispatcher;
this.eventLoopGroup = worker;
this.beanFactory = beanFactory;
}

@Override
Expand Down Expand Up @@ -141,9 +146,9 @@ private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) t
}
pipeline.addLast(new WebSocketFrameAggregator(Integer.MAX_VALUE));
if (properties.getUseEventExecutorGroup()) {
pipeline.addLast(eventLoopGroup, new WebSocketServerHandler(websocketDispatcher));
pipeline.addLast(eventLoopGroup, new WebSocketServerHandler(websocketDispatcher, beanFactory));
} else {
pipeline.addLast(new WebSocketServerHandler(websocketDispatcher));
pipeline.addLast(new WebSocketServerHandler(websocketDispatcher, beanFactory));
}
handshaker.handshake(channel, req).addListener(future -> {
if (future.isSuccess()) {
Expand Down
49 changes: 49 additions & 0 deletions src/main/java/plus/jdk/websocket/global/SessionGroupManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package plus.jdk.websocket.global;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import plus.jdk.websocket.model.ChannelModel;
import plus.jdk.websocket.model.IWsSession;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;

@Slf4j
public class SessionGroupManager {

private final ConcurrentHashMap<Channel, ChannelModel> channelModelMap = new ConcurrentHashMap<>();

private final ConcurrentHashMap<Object, ConcurrentHashMap<String, ConcurrentLinkedDeque<IWsSession<?>>>> sessionMap = new ConcurrentHashMap<>();

protected void addSession(String path, IWsSession<?> iWsSession) {
Object uid = iWsSession.getUserId();
sessionMap.putIfAbsent(uid, new ConcurrentHashMap<>());
sessionMap.get(uid).putIfAbsent(path, new ConcurrentLinkedDeque<>());
sessionMap.get(uid).get(path).add(iWsSession);
channelModelMap.put(iWsSession.getChannel(), new ChannelModel(uid, path));
}

public <T> ConcurrentLinkedDeque<IWsSession<?>> getSession(T userId, String path) {
ConcurrentHashMap<String, ConcurrentLinkedDeque<IWsSession<?>>> userSessionMap = sessionMap.get(userId);
if (userSessionMap == null || userSessionMap.get(path) == null) {
return new ConcurrentLinkedDeque<>();
}
return sessionMap.get(userId).get(path);
}

protected void releaseChannel(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
ChannelModel channelModel = channelModelMap.get(channel);
Object uid = channelModel.getUserId();
String path = channelModel.getPath();
sessionMap.putIfAbsent(uid, new ConcurrentHashMap<>());
sessionMap.get(uid).putIfAbsent(path, new ConcurrentLinkedDeque<>());
ConcurrentLinkedDeque<IWsSession<?>> wsSessionsDeque = sessionMap.get(uid).get(path);
if (wsSessionsDeque == null) {
return;
}
wsSessionsDeque.removeIf(iWsSession -> iWsSession.getChannel() == channel);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,26 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.BeanFactory;
import sun.util.logging.resources.logging;

@Slf4j
public class WebSocketServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

private final WebsocketDispatcher websocketDispatcher;

public WebSocketServerHandler(WebsocketDispatcher pojoEndpointServer) {
this.websocketDispatcher = pojoEndpointServer;
private final BeanFactory beanFactory;

public WebSocketServerHandler(WebsocketDispatcher websocketDispatcher, BeanFactory beanFactory) {
this.websocketDispatcher = websocketDispatcher;
this.beanFactory = beanFactory;
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
SessionGroupManager sessionGroupManager = beanFactory.getBean(SessionGroupManager.class);
sessionGroupManager.releaseChannel(ctx);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
if (corsConfig != null) {
ch.pipeline().addLast(new CorsHandler(corsConfig));
}
ch.pipeline().addLast("handler", new HttpServerHandler(properties, websocketDispatcher, worker));//自定义的业务handler
ch.pipeline().addLast("handler", new HttpServerHandler(properties, websocketDispatcher, worker, beanFactory));//自定义的业务handler
}
});

Expand Down Expand Up @@ -194,8 +194,10 @@ public void doOnOpen(Channel channel, FullHttpRequest req, String path) throws E

private void setSession(Channel channel, FullHttpRequest req, String path) throws Exception {
IWSSessionAuthenticator<?> authenticator = beanFactory.getBean(properties.getSessionAuthenticator());
SessionGroupManager sessionGroupManager = beanFactory.getBean(SessionGroupManager.class);
IWsSession<?> wsSession = authenticator.authenticate(channel, req, path);
channel.attr(SESSION_KEY).set(wsSession);
sessionGroupManager.addSession(path, wsSession);
}

public void doOnClose(Channel channel) {
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/plus/jdk/websocket/model/ChannelModel.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package plus.jdk.websocket.model;

import lombok.AllArgsConstructor;
import lombok.Data;

@Data
@AllArgsConstructor
public class ChannelModel {

/**
* 当前的uid
*/
private Object userId;

/**
* websocket路径
*/
private String path;
}

0 comments on commit 95fde6c

Please sign in to comment.