From 95fde6cbfe1fc6abd2ad566da3a623b7f1d72bae Mon Sep 17 00:00:00 2001 From: "[moon]" <[moon@jdk.plus]> Date: Mon, 18 Jul 2022 23:55:49 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=AF=B9session=E7=9A=84?= =?UTF-8?q?=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README-CN.md | 46 ++++++++++++++++- README.md | 45 ++++++++++++++++- pom.xml | 2 +- .../config/WebsocketAutoConfiguration.java | 6 +++ .../websocket/global/HttpServerHandler.java | 11 +++-- .../websocket/global/SessionGroupManager.java | 49 +++++++++++++++++++ .../global/WebSocketServerHandler.java | 17 ++++++- .../websocket/global/WebsocketDispatcher.java | 4 +- .../jdk/websocket/model/ChannelModel.java | 19 +++++++ 9 files changed, 189 insertions(+), 10 deletions(-) create mode 100644 src/main/java/plus/jdk/websocket/global/SessionGroupManager.java create mode 100644 src/main/java/plus/jdk/websocket/model/ChannelModel.java diff --git a/README-CN.md b/README-CN.md index 31f809c..46f2a43 100644 --- a/README-CN.md +++ b/README-CN.md @@ -1,4 +1,6 @@ - +
+ drawing +

这是一款使用netty编写的springboot websocket组件。

@@ -17,7 +19,7 @@ plus.jdk spring-boot-starter-websocket - 1.0.1 + 1.0.2 ``` ## 配置 @@ -197,3 +199,43 @@ public class DemoHandler { } ``` +### 使用websocket连接主动向用户客户端推送消息 + +```java +package plus.jdk.broadcast.test.controller; + +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import plus.jdk.websocket.global.SessionGroupManager; +import plus.jdk.websocket.model.IWsSession; + +import javax.annotation.Resource; +import java.util.concurrent.ConcurrentLinkedDeque; + + +@RestController +public class MessageController { + + /** + * 该bean实例已经在底层封装好 + */ + @Resource + private SessionGroupManager sessionGroupManager; + + @RequestMapping(value = "/message/send", method = {RequestMethod.GET}) + public Object sendMessage(@RequestParam String uid, @RequestParam String content){ + + // 调用sessionGroupManager.getSession()函数获取当前用户在该实例中的所有连接 + // 你可以在 IWSSessionAuthenticator 的实现中自行实现自己的session定义,将消息分发给不同的设备 + // 或向远端上报当前用户的连接到底在哪些机器上 + ConcurrentLinkedDeque> sessions = sessionGroupManager.getSession(uid, "/ws/message"); + for(IWsSession wsSession: sessions) { + wsSession.sendText(content); // 发送文本消息 + wsSession.sendBinary(content.getBytes()); // 发送二进制消息 + } + return "success"; + } +} +``` diff --git a/README.md b/README.md index 1e954cc..32f91e1 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,6 @@ +

+ drawing +

A springboot websocket component written using netty。

@@ -14,7 +17,7 @@ plus.jdk spring-boot-starter-websocket - 1.0.1 + 1.0.2 ``` @@ -195,3 +198,43 @@ public class DemoHandler { } ``` +### Actively push messages to user clients using websocket connections + +```java +package plus.jdk.broadcast.test.controller; + +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; +import plus.jdk.websocket.global.SessionGroupManager; +import plus.jdk.websocket.model.IWsSession; + +import javax.annotation.Resource; +import java.util.concurrent.ConcurrentLinkedDeque; + + +@RestController +public class MessageController { + + /** + * The bean instance is already encapsulated at the bottom + */ + @Resource + private SessionGroupManager sessionGroupManager; + + @RequestMapping(value = "/message/send", method = {RequestMethod.GET}) + public Object sendMessage(@RequestParam String uid, @RequestParam String content){ + + // Call the sessionGroupManager.getSession() function to get all connections of the current user in this instance. + // You can implement your own session definition in the implementation of IWSSessionAuthenticator to distribute messages to different devices + // Or report to the remote end which machines the current user is connected to + ConcurrentLinkedDeque> sessions = sessionGroupManager.getSession(uid, "/ws/message"); + for(IWsSession wsSession: sessions) { + wsSession.sendText(content); // 发送文本消息 + wsSession.sendBinary(content.getBytes()); // 发送二进制消息 + } + return "success"; + } +} +``` diff --git a/pom.xml b/pom.xml index a519a2c..2492e77 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ plus.jdk spring-boot-starter-websocket - 1.0.1 + 1.0.2 spring-boot-starter-websocket A simple websocket component base on netty diff --git a/src/main/java/plus/jdk/websocket/config/WebsocketAutoConfiguration.java b/src/main/java/plus/jdk/websocket/config/WebsocketAutoConfiguration.java index f238c76..2d122df 100644 --- a/src/main/java/plus/jdk/websocket/config/WebsocketAutoConfiguration.java +++ b/src/main/java/plus/jdk/websocket/config/WebsocketAutoConfiguration.java @@ -13,6 +13,7 @@ import org.springframework.web.context.support.WebApplicationObjectSupport; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; import plus.jdk.websocket.global.ServerEndpointExporter; +import plus.jdk.websocket.global.SessionGroupManager; import plus.jdk.websocket.global.WebsocketDispatcher; import plus.jdk.websocket.annotations.*; import plus.jdk.websocket.properties.WebsocketProperties; @@ -34,6 +35,11 @@ public class WebsocketAutoConfiguration extends WebApplicationObjectSupport impl public WebsocketAutoConfiguration(WebsocketProperties properties) { } + @Bean + public SessionGroupManager SessionGroupManager() { + return new SessionGroupManager(); + } + @Bean public WebsocketDispatcher WebsocketDispatcher(WebsocketProperties properties){ return new WebsocketDispatcher(properties, beanFactory); diff --git a/src/main/java/plus/jdk/websocket/global/HttpServerHandler.java b/src/main/java/plus/jdk/websocket/global/HttpServerHandler.java index 91a2ab1..76ac67b 100644 --- a/src/main/java/plus/jdk/websocket/global/HttpServerHandler.java +++ b/src/main/java/plus/jdk/websocket/global/HttpServerHandler.java @@ -14,6 +14,7 @@ import io.netty.util.AttributeKey; import io.netty.util.CharsetUtil; import org.springframework.beans.TypeMismatchException; +import org.springframework.beans.factory.BeanFactory; import org.springframework.util.StringUtils; import plus.jdk.websocket.common.WebsocketCommonException; import plus.jdk.websocket.properties.WebsocketProperties; @@ -31,10 +32,14 @@ public class HttpServerHandler extends SimpleChannelInboundHandler { if (future.isSuccess()) { diff --git a/src/main/java/plus/jdk/websocket/global/SessionGroupManager.java b/src/main/java/plus/jdk/websocket/global/SessionGroupManager.java new file mode 100644 index 0000000..f12b147 --- /dev/null +++ b/src/main/java/plus/jdk/websocket/global/SessionGroupManager.java @@ -0,0 +1,49 @@ +package plus.jdk.websocket.global; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; +import plus.jdk.websocket.model.ChannelModel; +import plus.jdk.websocket.model.IWsSession; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; + +@Slf4j +public class SessionGroupManager { + + private final ConcurrentHashMap channelModelMap = new ConcurrentHashMap<>(); + + private final ConcurrentHashMap>>> sessionMap = new ConcurrentHashMap<>(); + + protected void addSession(String path, IWsSession iWsSession) { + Object uid = iWsSession.getUserId(); + sessionMap.putIfAbsent(uid, new ConcurrentHashMap<>()); + sessionMap.get(uid).putIfAbsent(path, new ConcurrentLinkedDeque<>()); + sessionMap.get(uid).get(path).add(iWsSession); + channelModelMap.put(iWsSession.getChannel(), new ChannelModel(uid, path)); + } + + public ConcurrentLinkedDeque> getSession(T userId, String path) { + ConcurrentHashMap>> userSessionMap = sessionMap.get(userId); + if (userSessionMap == null || userSessionMap.get(path) == null) { + return new ConcurrentLinkedDeque<>(); + } + return sessionMap.get(userId).get(path); + } + + protected void releaseChannel(ChannelHandlerContext ctx) { + Channel channel = ctx.channel(); + ChannelModel channelModel = channelModelMap.get(channel); + Object uid = channelModel.getUserId(); + String path = channelModel.getPath(); + sessionMap.putIfAbsent(uid, new ConcurrentHashMap<>()); + sessionMap.get(uid).putIfAbsent(path, new ConcurrentLinkedDeque<>()); + ConcurrentLinkedDeque> wsSessionsDeque = sessionMap.get(uid).get(path); + if (wsSessionsDeque == null) { + return; + } + wsSessionsDeque.removeIf(iWsSession -> iWsSession.getChannel() == channel); + } + +} diff --git a/src/main/java/plus/jdk/websocket/global/WebSocketServerHandler.java b/src/main/java/plus/jdk/websocket/global/WebSocketServerHandler.java index 4f6b594..36f80be 100644 --- a/src/main/java/plus/jdk/websocket/global/WebSocketServerHandler.java +++ b/src/main/java/plus/jdk/websocket/global/WebSocketServerHandler.java @@ -4,13 +4,26 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.*; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.BeanFactory; +import sun.util.logging.resources.logging; +@Slf4j public class WebSocketServerHandler extends SimpleChannelInboundHandler { private final WebsocketDispatcher websocketDispatcher; - public WebSocketServerHandler(WebsocketDispatcher pojoEndpointServer) { - this.websocketDispatcher = pojoEndpointServer; + private final BeanFactory beanFactory; + + public WebSocketServerHandler(WebsocketDispatcher websocketDispatcher, BeanFactory beanFactory) { + this.websocketDispatcher = websocketDispatcher; + this.beanFactory = beanFactory; + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + SessionGroupManager sessionGroupManager = beanFactory.getBean(SessionGroupManager.class); + sessionGroupManager.releaseChannel(ctx); } @Override diff --git a/src/main/java/plus/jdk/websocket/global/WebsocketDispatcher.java b/src/main/java/plus/jdk/websocket/global/WebsocketDispatcher.java index 55f1f3a..de29f10 100644 --- a/src/main/java/plus/jdk/websocket/global/WebsocketDispatcher.java +++ b/src/main/java/plus/jdk/websocket/global/WebsocketDispatcher.java @@ -87,7 +87,7 @@ protected void initChannel(SocketChannel ch) throws Exception { if (corsConfig != null) { ch.pipeline().addLast(new CorsHandler(corsConfig)); } - ch.pipeline().addLast("handler", new HttpServerHandler(properties, websocketDispatcher, worker));//自定义的业务handler + ch.pipeline().addLast("handler", new HttpServerHandler(properties, websocketDispatcher, worker, beanFactory));//自定义的业务handler } }); @@ -194,8 +194,10 @@ public void doOnOpen(Channel channel, FullHttpRequest req, String path) throws E private void setSession(Channel channel, FullHttpRequest req, String path) throws Exception { IWSSessionAuthenticator authenticator = beanFactory.getBean(properties.getSessionAuthenticator()); + SessionGroupManager sessionGroupManager = beanFactory.getBean(SessionGroupManager.class); IWsSession wsSession = authenticator.authenticate(channel, req, path); channel.attr(SESSION_KEY).set(wsSession); + sessionGroupManager.addSession(path, wsSession); } public void doOnClose(Channel channel) { diff --git a/src/main/java/plus/jdk/websocket/model/ChannelModel.java b/src/main/java/plus/jdk/websocket/model/ChannelModel.java new file mode 100644 index 0000000..8f95173 --- /dev/null +++ b/src/main/java/plus/jdk/websocket/model/ChannelModel.java @@ -0,0 +1,19 @@ +package plus.jdk.websocket.model; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class ChannelModel { + + /** + * 当前的uid + */ + private Object userId; + + /** + * websocket路径 + */ + private String path; +}