Skip to content

Commit

Permalink
feat: 消息扩展字段增加双向删除的属性
Browse files Browse the repository at this point in the history
  • Loading branch information
tangtaoit committed Dec 18, 2023
1 parent a279d12 commit 28c5635
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 30 deletions.
3 changes: 3 additions & 0 deletions WuKongIMSDK/Classes/WKOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ typedef WKConnectInfo*_Nonnull(^WKConnectInfoCallback)(void);
@property(nonatomic,assign) NSInteger expireMsgCheckInterval; // 过期消息检查间隔 单位秒
@property(nonatomic,assign) NSInteger expireMsgLimit; // 过期消息每次查询数量

@property(nonatomic,assign) NSInteger sendFrequency; // 消息发送延迟时间 单位毫秒
@property(nonatomic,assign) NSInteger sendMaxCountOfEach; // 消息每次发送最大数量

@end

NS_ASSUME_NONNULL_END
3 changes: 3 additions & 0 deletions WuKongIMSDK/Classes/WKOptions.m
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ -(id) init {
self.receiptFlushInterval = 2;

self.channelRequestMaxLimit = 10;

self.sendFrequency = 100;
self.sendMaxCountOfEach = 5;
}
return self;
}
Expand Down
3 changes: 3 additions & 0 deletions WuKongIMSDK/Classes/db/WKMessageDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ NS_ASSUME_NONNULL_BEGIN

-(void) deleteMessagesWithClientSeqs:(NSArray<NSNumber*>*)ids;

-(void) deleteMessagesWithMessageIDs:(NSArray<NSNumber*>*)messageIDs;

-(void) deleteMessagesWithMessageIDs:(NSArray<NSNumber*>*)messageIDs db:(FMDatabase*)db;

/**
彻底将消息从数据库删除 (deleteMessage只是标记为删除)
Expand Down
24 changes: 24 additions & 0 deletions WuKongIMSDK/Classes/db/WKMessageDB.m
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@
#define SQL_MESSAGE_UPDATE_EXTRA [NSString stringWithFormat:@"update %@ set extra=? where id=?",TB_MESSAGE]
// 删除指定id的消息
#define SQL_MESSAGE_DELETE_MESSAGE_ID [NSString stringWithFormat:@"update %@ set is_deleted=1 where message_id=?",TB_MESSAGE]

#define SQL_MESSAGE_DELETE_MESSAGE_IDS [NSString stringWithFormat:@"update %@ set is_deleted=1 where message_id in ",TB_MESSAGE]

// 删除指定id的消息
#define SQL_MESSAGE_DELETE_CLIENT_SEQ [NSString stringWithFormat:@"update %@ set is_deleted=1 where id=?",TB_MESSAGE]

Expand Down Expand Up @@ -770,6 +773,27 @@ -(void) deleteMessagesWithClientSeqs:(NSArray<NSNumber*>*)ids {
}];
}

-(void) deleteMessagesWithMessageIDs:(NSArray<NSNumber*>*)messageIDs {
__weak typeof(self) weakSelf = self;
[WKDB.sharedDB.dbQueue inDatabase:^(FMDatabase * _Nonnull db) {
[weakSelf deleteMessagesWithMessageIDs:messageIDs db:db];
}];
}

-(void) deleteMessagesWithMessageIDs:(NSArray<NSNumber*>*)messageIDs db:(FMDatabase*)db {
if(messageIDs && messageIDs.count==0) {
return;
}
if(messageIDs.count == 1) {
NSNumber *messageID = messageIDs[0];
[db executeUpdate:SQL_MESSAGE_DELETE_MESSAGE_ID,messageID];
}else {
NSString *idStrs = [messageIDs componentsJoinedByString:@","];
[db executeUpdate:[NSString stringWithFormat:@"%@ (%@)",SQL_MESSAGE_DELETE_MESSAGE_IDS,idStrs]];
}

}

- (void)destoryMessage:(WKMessage *)message {

[[WKDB sharedDB].dbQueue inDatabase:^(FMDatabase * _Nonnull db) {
Expand Down
2 changes: 2 additions & 0 deletions WuKongIMSDK/Classes/db/WKMessageExtraDB.m
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ -(void) addOrUpdateMessageExtras:(NSArray<WKMessageExtra*>*)messageExtras {
return;
}
[[WKDB sharedDB].dbQueue inTransaction:^(FMDatabase * _Nonnull db, BOOL * _Nonnull rollback) {
NSMutableArray<NSNumber*> *needDeleteMessageIDs = [NSMutableArray array];
for (WKMessageExtra *messageExtra in messageExtras) {
NSString *extraStr = @"";
if(messageExtra.extra) {
Expand All @@ -65,6 +66,7 @@ -(void) addOrUpdateMessageExtras:(NSArray<WKMessageExtra*>*)messageExtras {
readedAt = [messageExtra.readedAt timeIntervalSince1970];
}
[db executeUpdate:SQL_MESSAGE_EXTRA_INSERT_OR_UPDATE,@(messageExtra.messageID),@(messageExtra.messageSeq),messageExtra.channelID?:@"",@(messageExtra.channelType),@(messageExtra.readed),@(readedAt),@(messageExtra.readedCount),@(messageExtra.unreadCount),@(messageExtra.revoke),messageExtra.revoker?:@"",messageExtra.contentEditData?:@"",@(messageExtra.editedAt),extraStr,@(messageExtra.extraVersion)];

}
}];
}
Expand Down
91 changes: 63 additions & 28 deletions WuKongIMSDK/Classes/manager/WKChatManager.m
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#import "WKMessageExtraDB.h"
#import "WKConversationManagerInner.h"
#import "WKConversationLastMessageAndUnreadCount.h"
#import "WKMessageQueueManager.h"

@interface WKChatManager ()

Expand Down Expand Up @@ -253,32 +254,37 @@ -(WKMessage*) sendMessage:(WKMessage*)message {

-(WKMessage*) sendMessage:(WKMessage*)message addRetryQueue:(BOOL)addRetryQueue{


dispatch_async(self.sendMessageQueue,^{
// 发送消息
WKSendPacket *sendPacket = [WKSendPacket new];
sendPacket.header.showUnread = message.header?message.header.showUnread:0;
sendPacket.header.noPersist = message.header?message.header.noPersist:0;
WKSetting *setting = message.setting;
if(message.topic && ![message.topic isEqualToString:@""]) {
setting.topic = true;
}
sendPacket.setting = setting;
sendPacket.clientSeq = message.clientSeq;
sendPacket.clientMsgNo = message.clientMsgNo;
sendPacket.channelId = message.channel.channelId;
sendPacket.channelType = message.channel.channelType;
sendPacket.expire = message.expire;
sendPacket.topic = message.topic;
sendPacket.payload = message.content.encode;

if(addRetryQueue) {
// 添加到重试队列
[[WKRetryManager shared] add:message];
}
[[[WKSDK shared] connectionManager] sendPacket:sendPacket];

});
[WKMessageQueueManager.shared sendMessage:message];

// dispatch_async(self.sendMessageQueue,^{
// // 发送消息
// WKSendPacket *sendPacket = [WKSendPacket new];
// sendPacket.header.showUnread = message.header?message.header.showUnread:0;
// sendPacket.header.noPersist = message.header?message.header.noPersist:0;
// WKSetting *setting = message.setting;
// if(message.topic && ![message.topic isEqualToString:@""]) {
// setting.topic = true;
// }
// sendPacket.setting = setting;
// sendPacket.clientSeq = message.clientSeq;
// sendPacket.clientMsgNo = message.clientMsgNo;
// sendPacket.channelId = message.channel.channelId;
// sendPacket.channelType = message.channel.channelType;
// sendPacket.expire = message.expire;
// sendPacket.topic = message.topic;
// sendPacket.payload = message.content.encode;
//
// if(addRetryQueue) {
// // 添加到重试队列
// [[WKRetryManager shared] add:message];
// }
// [[[WKSDK shared] connectionManager] sendPacket:sendPacket];
//
// });



Expand Down Expand Up @@ -452,13 +458,16 @@ -(void) handleSendack:(NSArray<WKSendackPacket*> *)sendackArray {
}

// 调用委托
NSArray<WKMessage*> *messages = [[WKMessageDB shared] getMessagesWithClientSeqs:clientIDs];
if(messages && messages.count>0) {
for (NSInteger i=0; i<messages.count; i++) {
WKMessage *message = messages[i];
[self callMessageUpdateDelegate:message left:messages.count-1-i total:messages.count];
if(clientIDs.count>0) {
NSArray<WKMessage*> *messages = [[WKMessageDB shared] getMessagesWithClientSeqs:clientIDs];
if(messages && messages.count>0) {
NSLog(@"messages--aaaa------->%lu",messages.count);
for (NSInteger i=0; i<messages.count; i++) {
WKMessage *message = messages[i];
[self callMessageUpdateDelegate:message left:messages.count-1-i total:messages.count];
}
[self addOrUpdateConversationWithMessages:messages];
}
[self addOrUpdateConversationWithMessages:messages];
}
for (NSInteger i=0; i<sendackArray.count; i++) {
[self callSendackDelegate:sendackArray[i] left:sendackArray.count-(i+1)];
Expand Down Expand Up @@ -1426,8 +1435,14 @@ -(void) updateMessageRemoteExtra:(WKMessage*)message {
NSLog(@"更新远程消息扩展失败!->%@",error);
return;
}

[[WKMessageExtraDB shared] addOrUpdateMessageExtras:@[message.remoteExtra]];
[weakSelf callMessageUpdateDelegate:message];

if(message.remoteExtra.isMutualDeleted) { // 如果是双向删除 则删除此消息
[weakSelf deleteMessage:message];
}

});
}

Expand Down Expand Up @@ -1701,11 +1716,31 @@ -(void) syncMessageExtra:(WKChannel*)channel complete:(void(^)(NSError *error))c
if(messages && messages.count>0) {
NSDictionary *reactionDict= [[WKReactionDB shared] getReactionDictionary:messageIDs];
NSInteger i = messages.count - 1;

// 消息更新通知
for (WKMessage *message in messages) {
message.reactions = reactionDict[[NSString stringWithFormat:@"%llu", message.messageId]];
[weakSelf callMessageUpdateDelegate:message left:i total:messages.count];
i--;
}

// 消息删除通知
NSMutableArray<WKMessage*> *deletedMessages = [NSMutableArray array];
for (WKMessage *message in messages) {
for (WKMessageExtra *messageExtra in results) {
if(messageExtra.isMutualDeleted) {
message.isDeleted = true;
[deletedMessages addObject:message];
break;
}
}
}
if(deletedMessages.count>0) {
for (WKMessage *message in deletedMessages) {
[weakSelf deleteMessage:message];
}
}

}
// [weakSelf updateMessageExtraFromRemote:results];
}
Expand Down
1 change: 1 addition & 0 deletions WuKongIMSDK/Classes/manager/WKConnectionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ typedef enum : NSUInteger {
*/
-(void) sendPacket:(WKPacket*)packet;

-(void) writeData:(NSData*) data;

/**
发送ping包
Expand Down
24 changes: 22 additions & 2 deletions WuKongIMSDK/Classes/manager/WKConnectionManager.m
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#import "WKReminderManager.h"
#import "WKChatManagerInner.h"
#import "WKConversationManagerInner.h"
#import "WKMessageQueueManager.h"

@interface WKConnectionManager ()<GCDAsyncSocketDelegate>

Expand Down Expand Up @@ -428,7 +429,11 @@ -(void) sendPing{
// 发送包
-(void) sendPacket:(WKPacket*)packet{
NSData *data = [[WKSDK shared].coder encode:packet];
[self.ssocket writeData:data withTimeout:-1 tag:0];
[self writeData:data];
}

-(void) writeData:(NSData*) data {
[self.ssocket writeData:data withTimeout:1 tag:0];
}

- (NSLock *)delegateLock {
Expand All @@ -453,8 +458,10 @@ - (void)removeDelegate:(id<WKConnectionManagerDelegate>) delegate {
-(void) connectStatusChange {
if(self.connectStatusInner == WKConnected) {
[[WKRetryManager shared] start];
[WKMessageQueueManager.shared start];
}else {
[[WKRetryManager shared] stop];
[[WKMessageQueueManager shared] stop];
}
if(self.onConnectStatusChange) {
self.onConnectStatusChange(self.connectStatusInner);
Expand Down Expand Up @@ -519,6 +526,9 @@ -(void) unpacket:(NSData*)packetData callback:(void(^) (NSArray<NSData*> *data)
[dataList addObject:data];
}];
lenAfter = self.tempBufferData.length;
if(lenAfter>0) {
NSLog(@"有剩余未被解析的包->%lu",lenAfter);
}
} while (lenBefore != lenAfter && lenAfter >= 1);
if (dataList.count > 0) {
callback(dataList);
Expand Down Expand Up @@ -568,19 +578,23 @@ -(NSMutableData*) unpackOneLM:(NSMutableData*)packData callback:(void(^) (NSData
} while (hasLength);

if (!remLengthFull) {
NSLog(@"包长度没有读出来");
return packData;
}
int remLengthLength = pos - fixedHeaderLength; // 剩余长度的长度
if (fixedHeaderLength + remLengthLength + remLength > length) {
// 固定头的长度 + 剩余长度的长度 + 剩余长度 如果大于 总长度说明分包了
NSLog(@"分包了...");
return packData;
}else {
if (fixedHeaderLength + remLengthLength + remLength == length) {
// 刚好一个包
NSLog(@"刚好一个包");
callback(packData);
return [[NSMutableData alloc] init];
} else {
// 粘包 大于1个包
NSLog(@"粘包 大于1个包");
int packetLength = fixedHeaderLength + remLengthLength + remLength;;
callback([packData subdataWithRange:NSMakeRange(0, packetLength)]);
return [[NSMutableData alloc] initWithData:[packData subdataWithRange:NSMakeRange(packetLength, length-packetLength)]];
Expand Down Expand Up @@ -653,6 +667,7 @@ -(void) handlePacketData:(NSArray<NSData*>*)dataList {
}

-(void) handlePackets:(NSArray<WKPacket*>*)packets {
NSLog(@"handlePackets----------------start---->%lu",packets.count);
NSDictionary<NSNumber*,NSArray<WKPacket*>*>* packetDict = [self packetGroup:packets];
for (NSNumber *packetTypeNum in packetDict.allKeys) {
NSArray<WKPacket*> *packetList = [packetDict objectForKey:packetTypeNum];
Expand All @@ -661,11 +676,16 @@ -(void) handlePackets:(NSArray<WKPacket*>*)packets {
[[WKSDK shared].chatManager handleSendack:(NSArray<WKSendackPacket*> *)packetList];
break;
case WK_RECV:
[[WKSDK shared].chatManager handleRecv:(NSArray<WKRecvPacket*> *)packetList];
[[WKSDK shared].chatManager handleRecv:(NSArray<WKRecvPacket*> *)packetList];
break;
case WK_PONG:
break;
default:
NSLog(@"未知的数据包-->[%d]",packetTypeNum.unsignedIntValue);
break;
}
}
NSLog(@"handlePackets----------------end---->%lu",packets.count);
}

-( WKConnectStatus) connectStatus {
Expand Down
25 changes: 25 additions & 0 deletions WuKongIMSDK/Classes/manager/WKMessageQueueManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//
// WKMessageQueueManager.h
// WuKongIMSDK
//
// Created by tt on 2023/11/15.
//

#import <Foundation/Foundation.h>
#import "WKMessage.h"

NS_ASSUME_NONNULL_BEGIN

@interface WKMessageQueueManager : NSObject

+ (WKMessageQueueManager *)shared;

-(void) start;

-(void) stop;

- (void)sendMessage:(WKMessage *)message;

@end

NS_ASSUME_NONNULL_END
Loading

0 comments on commit 28c5635

Please sign in to comment.