Skip to content

Commit

Permalink
添加websocket集群广播解决方案
Browse files Browse the repository at this point in the history
  • Loading branch information
[moon] committed Jul 22, 2022
1 parent 8f9ecbc commit 3d160c5
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 18 deletions.
42 changes: 33 additions & 9 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<a href="https://github.com/JDK-Plus/spring-boot-starter-websocket/stargazers"><img src="https://img.shields.io/github/stars/JDK-Plus/spring-boot-starter-websocket.svg" /></a>
<a href="https://github.com/JDK-Plus/spring-boot-starter-websocket/network/members"><img src="https://img.shields.io/github/forks/JDK-Plus/spring-boot-starter-websocket.svg" /></a>
</p>
<p align="center">这是一款支持集群广播的使用netty编写的websocket组件,完美解决websocket和用户单机无法和整个业务集群通信的问题</p>


- [English](README-CN.md)
Expand Down Expand Up @@ -67,6 +68,9 @@ plus.jdk.websocket.log-level=debug
# udp广播监听端口
plus.jdk.websocket.broadcast-monitor-port=10300
# udp广播监听端口,若小于等于0,则不监听消息
plus.jdk.websocket.broadcast-monitor-port=10300
```


Expand Down Expand Up @@ -107,23 +111,40 @@ public class MyWsSession implements IWsSession<String> {
import io.netty.channel.Channel;
import io.netty.handler.codec.http.FullHttpRequest;
import org.springframework.stereotype.Component;
import plus.jdk.broadcast.model.Monitor;
import plus.jdk.websocket.common.HttpWsRequest;
import plus.jdk.websocket.global.IWSSessionAuthenticatorManager;
import plus.jdk.websocket.model.IWsSession;
import plus.jdk.websocket.properties.WebsocketProperties;

@Component
public class WSSessionAuthenticator implements IWSSessionAuthenticatorManager<MyWsSession> {
public class WSSessionAuthenticator implements IWSSessionAuthenticatorManager<String, MyWsSession> {

/**
* 握手阶段验证session信息,若验证不通过,直接抛出异常终止握手流程。
* 若验证成功,则返回自定义的session,并使用redis之类的公用存储服务记录当前用户和哪台机器建立了连接
*/
@Override
public MyWsSession authenticate(Channel channel, FullHttpRequest req, String path) {
public MyWsSession authenticate(Channel channel, FullHttpRequest req, String path, WebsocketProperties properties) {
HttpWsRequest httpWsRequest = new HttpWsRequest(req);
String uid = httpWsRequest.getQueryValue("uid");
return new MyWsSession(uid, channel);
}

/**
* 当连接断开时,销毁session的回调
*/
@Override
public void onSessionDestroy(IWsSession<?> session, String path, WebsocketProperties properties) {

}

/**
* 返回当前用户和哪些机器建立了连接,需要向这些机器发送广播推送消息
*/
@Override
public void onSessionDestroy(IWsSession<MyWsSession> session) {
IWSSessionAuthenticatorManager.super.onSessionDestroy(session);
public Monitor[] getUserConnectedMachine(String userId, String path, WebsocketProperties properties) {
return new Monitor[]{new Monitor("127.0.0.1", properties.getBroadcastMonitorPort())};
}
}
```
Expand Down Expand Up @@ -219,15 +240,18 @@ public class MessageController {

@RequestMapping(value = "/message/send", method = {RequestMethod.GET})
public Object sendMessage(@RequestParam String uid, @RequestParam String content){

// 调用sessionGroupManager.getSession()函数获取当前用户在该实例中的所有连接
// 你可以在 IWSSessionAuthenticator 的实现中自行实现自己的session定义,将消息分发给不同的设备
// 或向远端上报当前用户的连接到底在哪些机器上
ConcurrentLinkedDeque<IWsSession<?>> sessions = sessionGroupManager.getSession(uid, "/ws/message");
for(IWsSession<?> wsSession: sessions) {
wsSession.sendText(content); // 发送文本消息
wsSession.sendBinary(content.getBytes()); // 发送二进制消息
}

// 发送文本消息
// 如果你按要求实现了IWSSessionAuthenticatorManager接口中的getUserConnectedMachine方法,那么将会向已经和用户建立连接的机器发送广播,推送消息

sessionGroupManager.sendText(uid, "/ws/message", content);

// 发送二进制消息
sessionGroupManager.sendBinary(uid, "/ws/message", content.getBytes(StandardCharsets.UTF_8));
return "success";
}
}
Expand Down
46 changes: 37 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<a href="https://github.com/JDK-Plus/spring-boot-starter-websocket/stargazers"><img src="https://img.shields.io/github/stars/JDK-Plus/spring-boot-starter-websocket.svg" /></a>
<a href="https://github.com/JDK-Plus/spring-boot-starter-websocket/network/members"><img src="https://img.shields.io/github/forks/JDK-Plus/spring-boot-starter-websocket.svg" /></a>
</p>
<p align="center">This is a websocket component written in netty that supports cluster broadcasting, Perfectly solve the problem that websocket and user single machine cannot communicate with the entire business cluster</p>

- [中文文档](README-CN.md)

Expand Down Expand Up @@ -63,6 +64,9 @@ plus.jdk.websocket.event-executor-group-threads=0
# log level
plus.jdk.websocket.log-level=debug
# udp broadcast listening port, if it is less than or equal to 0, it will not listen for messages
plus.jdk.websocket.broadcast-monitor-port=10300
```

## Example of use
Expand Down Expand Up @@ -103,23 +107,42 @@ The usage example is as follows:
import io.netty.channel.Channel;
import io.netty.handler.codec.http.FullHttpRequest;
import org.springframework.stereotype.Component;
import plus.jdk.broadcast.model.Monitor;
import plus.jdk.websocket.common.HttpWsRequest;
import plus.jdk.websocket.global.IWSSessionAuthenticatorManager;
import plus.jdk.websocket.model.IWsSession;
import plus.jdk.websocket.properties.WebsocketProperties;

@Component
public class WSSessionAuthenticator implements IWSSessionAuthenticatorManager<MyWsSession> {
public class WSSessionAuthenticator implements IWSSessionAuthenticatorManager<String, MyWsSession> {

/**
* The session information is verified in the connection handshake stage.
* If the verification fails, an exception is thrown directly to terminate the handshake process.
* If the verification is successful, return a custom session, and use a public storage service such as redis to
* record which machine the current user has established a connection with
*/
@Override
public MyWsSession authenticate(Channel channel, FullHttpRequest req, String path) {
public MyWsSession authenticate(Channel channel, FullHttpRequest req, String path, WebsocketProperties properties) {
HttpWsRequest httpWsRequest = new HttpWsRequest(req);
String uid = httpWsRequest.getQueryValue("uid");
return new MyWsSession(uid, channel);
}

/**
* When the connection is disconnected, the callback for destroying the session
*/
@Override
public void onSessionDestroy(IWsSession<?> session, String path, WebsocketProperties properties) {

}

/**
* Returns which machines the current user has established a connection with, and needs to send broadcast push messages to these machines
*/
@Override
public void onSessionDestroy(IWsSession<MyWsSession> session) {
IWSSessionAuthenticatorManager.super.onSessionDestroy(session);
public Monitor[] getUserConnectedMachine(String userId, String path, WebsocketProperties properties) {
return new Monitor[]{new Monitor("127.0.0.1", properties.getBroadcastMonitorPort())};
}
}
```
Expand Down Expand Up @@ -201,6 +224,7 @@ import plus.jdk.websocket.global.SessionGroupManager;
import plus.jdk.websocket.model.IWsSession;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentLinkedDeque;


Expand All @@ -214,16 +238,20 @@ public class MessageController {
private SessionGroupManager sessionGroupManager;

@RequestMapping(value = "/message/send", method = {RequestMethod.GET})
public Object sendMessage(@RequestParam String uid, @RequestParam String content){
public Object sendMessage(@RequestParam String uid, @RequestParam String content) {

// Call the sessionGroupManager.getSession() function to get all connections of the current user in this instance.
// You can implement your own session definition in the implementation of IWSSessionAuthenticator to distribute messages to different devices
// Or report to the remote end which machines the current user is connected to
ConcurrentLinkedDeque<IWsSession<?>> sessions = sessionGroupManager.getSession(uid, "/ws/message");
for(IWsSession<?> wsSession: sessions) {
wsSession.sendText(content); // 发送文本消息
wsSession.sendBinary(content.getBytes()); // 发送二进制消息
}

// send text message,
// If you implement the getUserConnectedMachine method in the IWSSessionAuthenticatorManager interface as required,
// it will send a broadcast and push message to the machine that has established a connection with the user
sessionGroupManager.sendText(uid, "/ws/message", content);

// send binary message
sessionGroupManager.sendBinary(uid, "/ws/message", content.getBytes(StandardCharsets.UTF_8));
return "success";
}
}
Expand Down

0 comments on commit 3d160c5

Please sign in to comment.