Skip to content

Commit

Permalink
websocket:重新封装 websocket 组件,支持 token 认证,并增加 WebSocketMessageListener …
Browse files Browse the repository at this point in the history
…方便处理消息
  • Loading branch information
YunaiV committed Nov 25, 2023
1 parent 522ab17 commit 2d9aa7a
Show file tree
Hide file tree
Showing 57 changed files with 1,929 additions and 36 deletions.
6 changes: 6 additions & 0 deletions yudao-dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@
<version>${revision}</version>
</dependency>

<dependency>
<groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao-spring-boot-starter-websocket</artifactId>
<version>${revision}</version>
</dependency>

<dependency>
<groupId>org.springdoc</groupId> <!-- 接口文档 UI:默认 -->
<artifactId>springdoc-openapi-ui</artifactId>
Expand Down
1 change: 1 addition & 0 deletions yudao-framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

<module>yudao-spring-boot-starter-flowable</module>
<module>yudao-spring-boot-starter-captcha</module>
<module>yudao-spring-boot-starter-websocket</module>
<module>yudao-spring-boot-starter-desensitize</module>
</modules>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,15 @@ public static <T, U> List<U> convertListByFlatMap(Collection<T> from,
return from.stream().flatMap(func).filter(Objects::nonNull).collect(Collectors.toList());
}

public static <T, U, R> List<R> convertListByFlatMap(Collection<T> from,
Function<? super T, ? extends U> mapper,
Function<U, ? extends Stream<? extends R>> func) {
if (CollUtil.isEmpty(from)) {
return new ArrayList<>();
}
return from.stream().map(mapper).flatMap(func).filter(Objects::nonNull).collect(Collectors.toList());
}

public static <T, U> Set<U> convertSetByFlatMap(Collection<T> from,
Function<T, ? extends Stream<? extends U>> func) {
if (CollUtil.isEmpty(from)) {
Expand All @@ -288,4 +297,13 @@ public static <T, U> Set<U> convertSetByFlatMap(Collection<T> from,
return from.stream().flatMap(func).filter(Objects::nonNull).collect(Collectors.toSet());
}

public static <T, U, R> Set<R> convertSetByFlatMap(Collection<T> from,
Function<? super T, ? extends U> mapper,
Function<U, ? extends Stream<? extends R>> func) {
if (CollUtil.isEmpty(from)) {
return new HashSet<>();
}
return from.stream().map(mapper).flatMap(func).filter(Objects::nonNull).collect(Collectors.toSet());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.framework.common.enums.DocumentEnum;
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob;
import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
Expand All @@ -23,7 +22,6 @@
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
Expand All @@ -33,30 +31,19 @@
import java.util.Properties;

/**
* 消息队列配置类
* Redis 消息队列 Consumer 配置类
*
* @author 芋道源码
*/
@Slf4j
@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQAutoConfiguration {

@Bean
public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
List<RedisMessageInterceptor> interceptors) {
RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
// 添加拦截器
interceptors.forEach(redisMQTemplate::addInterceptor);
return redisMQTemplate;
}

// ========== 消费者相关 ==========
public class YudaoRedisMQConsumerAutoConfiguration {

/**
* 创建 Redis Pub/Sub 广播消费的容器
*/
@Bean(initMethod = "start", destroyMethod = "stop")
@Bean
@ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisMQTemplate redisMQTemplate, List<AbstractRedisChannelMessageListener<?>> listeners) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package cn.iocoder.yudao.framework.mq.redis.config;

import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.List;

/**
* Redis 消息队列 Producer 配置类
*
* @author 芋道源码
*/
@Slf4j
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
public class YudaoRedisMQProducerAutoConfiguration {

@Bean
public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
List<RedisMessageInterceptor> interceptors) {
RedisMQTemplate redisMQTemplate = new RedisMQTemplate(redisTemplate);
// 添加拦截器
interceptors.forEach(redisMQTemplate::addInterceptor);
return redisMQTemplate;
}

}
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQAutoConfiguration
cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQProducerAutoConfiguration
cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQConsumerAutoConfiguration
cn.iocoder.yudao.framework.mq.rabbitmq.config.YudaoRabbitMQAutoConfiguration
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.baomidou.mybatisplus.extension.toolkit.Db;
import com.github.yulichang.base.MPJBaseMapper;
import com.github.yulichang.interfaces.MPJBaseJoin;
import org.apache.ibatis.annotations.Param;

import java.util.Collection;
Expand Down Expand Up @@ -39,6 +40,13 @@ default PageResult<T> selectPage(PageParam pageParam, @Param("ew") Wrapper<T> qu
return new PageResult<>(mpPage.getRecords(), mpPage.getTotal());
}

default <DTO> PageResult<DTO> selectJoinPage(PageParam pageParam, Class<DTO> resultTypeClass, MPJBaseJoin<T> joinQueryWrapper) {
IPage<DTO> mpPage = MyBatisUtils.buildPage(pageParam);
selectJoinPage(mpPage, resultTypeClass, joinQueryWrapper);
// 转换返回
return new PageResult<>(mpPage.getRecords(), mpPage.getTotal());
}

default T selectOne(String field, Object value) {
return selectOne(new QueryWrapper<T>().eq(field, value));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ public class SecurityProperties {
*/
@NotEmpty(message = "Token Header 不能为空")
private String tokenHeader = "Authorization";
/**
* HTTP 请求时,访问令牌的请求参数
*
* 初始目的:解决 WebSocket 无法通过 header 传参,只能通过 token 参数拼接
*/
@NotEmpty(message = "Token Parameter 不能为空")
private String tokenParameter = "token";

/**
* mock 模式的开关
Expand All @@ -41,5 +48,4 @@ public class SecurityProperties {
* PasswordEncoder 加密复杂度,越高开销越大
*/
private Integer passwordEncoderLength = 4;

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ protected void doFilterInternal(HttpServletRequest request, HttpServletResponse
// 情况二,基于 Token 获得用户
// 注意,这里主要满足直接使用 Nginx 直接转发到 Spring Cloud 服务的场景。
if (loginUser == null) {
String token = SecurityFrameworkUtils.obtainAuthorization(request, securityProperties.getTokenHeader());
String token = SecurityFrameworkUtils.obtainAuthorization(request,
securityProperties.getTokenHeader(), securityProperties.getTokenParameter());
if (StrUtil.isNotEmpty(token)) {
Integer userType = WebFrameworkUtils.getLoginUserType(request);
try {
Expand Down Expand Up @@ -82,7 +83,10 @@ private LoginUser buildLoginUserByToken(String token, Integer userType) {
return null;
}
// 用户类型不匹配,无权限
if (ObjectUtil.notEqual(accessToken.getUserType(), userType)) {
// 注意:只有 /admin-api/* 和 /app-api/* 有 userType,才需要比对用户类型
// 类似 WebSocket 的 /ws/* 连接地址,是不需要比对用户类型的
if (userType != null
&& ObjectUtil.notEqual(accessToken.getUserType(), userType)) {
throw new AccessDeniedException("错误的用户类型");
}
// 构建登录用户
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cn.iocoder.yudao.framework.security.core.util;

import cn.hutool.core.util.StrUtil;
import cn.iocoder.yudao.framework.security.core.LoginUser;
import cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils;
import org.springframework.lang.Nullable;
Expand All @@ -20,6 +21,9 @@
*/
public class SecurityFrameworkUtils {

/**
* HEADER 认证头 value 的前缀
*/
public static final String AUTHORIZATION_BEARER = "Bearer";

public static final String LOGIN_USER_HEADER = "login-user";
Expand All @@ -30,19 +34,23 @@ private SecurityFrameworkUtils() {}
* 从请求中,获得认证 Token
*
* @param request 请求
* @param header 认证 Token 对应的 Header 名字
* @param headerName 认证 Token 对应的 Header 名字
* @param parameterName 认证 Token 对应的 Parameter 名字
* @return 认证 Token
*/
public static String obtainAuthorization(HttpServletRequest request, String header) {
String authorization = request.getHeader(header);
if (!StringUtils.hasText(authorization)) {
return null;
public static String obtainAuthorization(HttpServletRequest request,
String headerName, String parameterName) {
// 1. 获得 Token。优先级:Header > Parameter
String token = request.getHeader(headerName);
if (StrUtil.isEmpty(token)) {
token = request.getParameter(parameterName);
}
int index = authorization.indexOf(AUTHORIZATION_BEARER + " ");
if (index == -1) { // 未找到
if (!StringUtils.hasText(token)) {
return null;
}
return authorization.substring(index + 7).trim();
// 2. 去除 Token 中带的 Bearer
int index = token.indexOf(AUTHORIZATION_BEARER + " ");
return index >= 0 ? token.substring(index + 7).trim() : token;
}

/**
Expand Down
84 changes: 84 additions & 0 deletions yudao-framework/yudao-spring-boot-starter-websocket/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao-framework</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>yudao-spring-boot-starter-websocket</artifactId>
<packaging>jar</packaging>

<name>${project.artifactId}</name>
<description>WebSocket 框架,支持多节点的广播</description>
<url>https://github.com/YunaiV/ruoyi-vue-pro</url>


<dependencies>
<dependency>
<groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao-common</artifactId>
</dependency>

<!-- Web 相关 -->
<dependency>
<!-- 为什么是 websocket 依赖 security 呢?而不是 security 拓展 websocket 呢?
因为 websocket 和 LoginUser 当前登录的用户有一定的相关性,具体可见 WebSocketSessionManagerImpl 逻辑。
如果让 security 拓展 websocket 的话,会导致 websocket 组件的封装很散,进而增大理解成本。
-->
<groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao-spring-boot-starter-security</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<!-- Web 相关 -->
<dependency>
<!-- 为什么是 websocket 依赖 security 呢?而不是 security 拓展 websocket 呢?
因为 websocket 和 LoginUser 当前登录的用户有一定的相关性,具体可见 WebSocketSessionManagerImpl 逻辑。
如果让 security 拓展 websocket 的话,会导致 websocket 组件的封装很散,进而增大理解成本。
-->
<groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao-spring-boot-starter-security</artifactId>
<scope>provided</scope>
</dependency>

<!-- 消息队列相关 -->
<dependency>
<groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao-spring-boot-starter-mq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<optional>true</optional>
</dependency>

<!-- 业务组件 -->
<dependency>
<!-- 为什么要依赖 tenant 组件?
因为广播某个类型的用户时候,需要根据租户过滤下,避免广播到别的租户!
-->
<groupId>cn.iocoder.cloud</groupId>
<artifactId>yudao-spring-boot-starter-biz-tenant</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package cn.iocoder.yudao.framework.websocket.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.validation.annotation.Validated;

import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;

/**
* WebSocket 配置项
*
* @author xingyu4j
*/
@ConfigurationProperties("yudao.websocket")
@Data
@Validated
public class WebSocketProperties {

/**
* WebSocket 的连接路径
*/
@NotEmpty(message = "WebSocket 的连接路径不能为空")
private String path = "/ws";

/**
* 消息发送器的类型
*
* 可选值:local、redis、rocketmq、kafka、rabbitmq
*/
@NotNull(message = "WebSocket 的消息发送者不能为空")
private String senderType = "local";

}
Loading

0 comments on commit 2d9aa7a

Please sign in to comment.