Skip to content

Commit

Permalink
添加消息推送回调
Browse files Browse the repository at this point in the history
  • Loading branch information
[moon] committed Jul 24, 2022
1 parent f29f1de commit 84d5789
Show file tree
Hide file tree
Showing 10 changed files with 284 additions and 99 deletions.
2 changes: 1 addition & 1 deletion README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<dependency>
<groupId>plus.jdk</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>1.0.5</version>
<version>1.0.7</version>
</dependency>
```
## 配置
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<dependency>
<groupId>plus.jdk</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>1.0.5</version>
<version>1.0.7</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>plus.jdk</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>

<name>spring-boot-starter-websocket</name>
<description>A simple websocket component base on netty </description>
Expand Down
11 changes: 7 additions & 4 deletions protoc/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ enum MessageType {
}

message WsMessage {
optional string uid = 1;

optional string path = 2;
optional string message_id = 1;

optional bytes data = 3;
optional string uid = 2;

optional MessageType type = 4;
optional string path = 3;

optional bytes data = 4;

optional MessageType type = 5;
}
11 changes: 11 additions & 0 deletions src/main/java/plus/jdk/websocket/global/IBroadMessagePromise.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package plus.jdk.websocket.global;

import io.netty.channel.Channel;
import plus.jdk.websocket.model.IWsSession;
import plus.jdk.websocket.protoc.WsMessage;


public interface IBroadMessagePromise {

void onCompletion(boolean success, WsMessage wsMessage, IWsSession<?> session);
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package plus.jdk.websocket.global;

import com.google.gson.Gson;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.PeriodicTrigger;
import plus.jdk.broadcast.broadcaster.UdpBroadcastMessageMonitor;
import plus.jdk.broadcast.broadcaster.UdpMessageBroadcaster;
import plus.jdk.broadcast.broadcaster.model.BroadcastMessage;
Expand Down Expand Up @@ -71,26 +67,41 @@ protected void sendBroadcast(Object userId, String path, byte[] data, Monitor[]

@Override
public void run(ApplicationArguments args) throws Exception {
if(properties.getBroadcastMonitorPort() <= 0) {
if (properties.getBroadcastMonitorPort() <= 0) {
return;
}
Thread thread = new Thread(() -> udpBroadcastMessageMonitor.subscribe((ctx, msg) -> {
WsMessage wsMessage = WsMessage.parseFrom(msg.getContent());
SessionGroupManager sessionGroupManager = beanFactory.getBean(SessionGroupManager.class);
ConcurrentLinkedDeque<IWsSession<?>> sessions = sessionGroupManager.getSession(wsMessage.getUid(), wsMessage.getPath());
if(properties.getPrintBroadcastMessage()) {
if (properties.getPrintBroadcastMessage()) {
log.info("receive broadcast message: {}", wsMessage);
}
for (IWsSession<?> session : sessions) {
if(MessageType.MESSAGE_TYPE_TEXT.equals(wsMessage.getType())) {
session.sendText(new String(wsMessage.getData().toByteArray()));
ChannelFuture future = null;
if (MessageType.MESSAGE_TYPE_TEXT.equals(wsMessage.getType())) {
future = session.sendText(new String(wsMessage.getData().toByteArray()));
}
if(MessageType.MESSAGE_TYPE_BINARY.equals(wsMessage.getType())) {
session.sendBinary(wsMessage.getData().toByteArray());
if (MessageType.MESSAGE_TYPE_BINARY.equals(wsMessage.getType())) {
future = session.sendBinary(wsMessage.getData().toByteArray());
}
if (future == null) {
continue;
}
future.addListener((ChannelFutureListener) channelFuture -> {
if(properties.getMessagePushPromise() == null) {
return;
}
try {
IBroadMessagePromise promise = beanFactory.getBean(properties.getMessagePushPromise());
promise.onCompletion(channelFuture.isSuccess(), wsMessage, session);
}catch(Exception e) {
log.error(e.getMessage());
}
});
}
return true;
}));
thread.start();
Runtime.getRuntime().addShutdownHook(thread);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import plus.jdk.websocket.global.DefaultSessionAuthenticatorManager;
import plus.jdk.websocket.global.IBroadMessagePromise;
import plus.jdk.websocket.global.IWSSessionAuthenticatorManager;
import plus.jdk.websocket.model.IWsSession;

Expand Down Expand Up @@ -98,4 +99,9 @@ public class WebsocketProperties {
* 认证器
*/
private Class<? extends IWSSessionAuthenticatorManager<?, ? extends IWsSession<?>>> sessionAuthenticator = DefaultSessionAuthenticatorManager.class;

/**
* 广播处理结果
*/
private Class<? extends IBroadMessagePromise> messagePushPromise;
}
11 changes: 6 additions & 5 deletions src/main/java/plus/jdk/websocket/protoc/WsBroadcastMessage.java

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 84d5789

Please sign in to comment.