Skip to content

Commit

Permalink
feat: 增加消息过期时间
Browse files Browse the repository at this point in the history
  • Loading branch information
tangtaoit committed Oct 8, 2023
1 parent 5921b50 commit 8adcc18
Show file tree
Hide file tree
Showing 15 changed files with 135 additions and 20 deletions.
3 changes: 3 additions & 0 deletions WuKongIMSDK/Assets/Migrations/202309222132.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@

alter table `message` add column expire bigint not null default 0; -- 消息过期时长
alter table `message` add column expire_at bigint not null default 0; -- 消息过期时间
3 changes: 3 additions & 0 deletions WuKongIMSDK/Classes/WKOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ typedef WKConnectInfo*_Nonnull(^WKConnectInfoCallback)(void);
// 是否追踪db日志
@property(nonatomic,assign) BOOL traceDBLog;

@property(nonatomic,assign) NSInteger expireMsgCheckInterval; // 过期消息检查间隔 单位秒
@property(nonatomic,assign) NSInteger expireMsgLimit; // 过期消息每次查询数量

@end

NS_ASSUME_NONNULL_END
4 changes: 3 additions & 1 deletion WuKongIMSDK/Classes/WKOptions.m
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ -(id) init {
self.reminderDoneUploadExpire = 60 * 60 * 24;
self.reminderRetryInterval = 10;
self.reminderRetryCount = 20;
self.expireMsgCheckInterval = 10;
self.expireMsgLimit = 50;


self.offlineMessageLimit = 300;
self.protoVersion = 0x2;
self.protoVersion = 0x3;
self.proto = WK_PROTO_WK;
self.messageFileRootDir =[[NSSearchPathForDirectoriesInDomains(NSCachesDirectory, NSUserDomainMask, YES) lastObject] stringByAppendingPathComponent:@"files"];
self.dbPrefix = WK_DB_Prefix;
Expand Down
2 changes: 2 additions & 0 deletions WuKongIMSDK/Classes/coder/WKSetting.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ NS_ASSUME_NONNULL_BEGIN

@property(nonatomic,assign) BOOL streamOn; // 是否开启流式消息

@property(nonatomic,assign) NSInteger expire; // 消息过期时长(单位秒)


-(uint8_t) toUint8;

Expand Down
2 changes: 2 additions & 0 deletions WuKongIMSDK/Classes/coder/proto/WKConnackPacket.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ NS_ASSUME_NONNULL_BEGIN

@interface WKConnackPacket : WKPacket<WKPacketBodyCoder>

// 服务端版本
@property(nonatomic,assign) uint8_t serverVersion;
// 通过客户端的RSA公钥加密的服务端DH公钥
@property(nonatomic,copy) NSString *serverKey;
// 安全吗
Expand Down
7 changes: 7 additions & 0 deletions WuKongIMSDK/Classes/coder/proto/WKConnackPacket.m
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ -(WKPacket*) decode:(NSData*) body header:(WKHeader*)header {
-(WKPacket*) decodeLM:(NSData*) body header:(WKHeader*)header {
WKConnackPacket *packet = [WKConnackPacket new];
WKDataRead *reader = [[WKDataRead alloc] initWithData:body];
if(WKSDK.shared.options.protoVersion > 3) {
packet.serverVersion = [reader readUint8];
if(packet.serverVersion < WKSDK.shared.options.protoVersion) {
WKSDK.shared.options.protoVersion = packet.serverVersion;
NSLog(@"使用协议版本:%hhu",WKSDK.shared.options.protoVersion);
}
}
packet.timeDiff = [reader readint64];
packet.reasonCode = [reader readUint8];
packet.serverKey = [reader readString];
Expand Down
3 changes: 3 additions & 0 deletions WuKongIMSDK/Classes/coder/proto/WKRecvPacket.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ typedef enum : NSUInteger {
@property(nonatomic,copy) NSString *channelId;
//频道类型(1.个人 2.群组)
@property(nonatomic,assign) uint8_t channelType;
// 消息过期时长
@property(nonatomic,assign) NSInteger expire;

// 话题
@property(nonatomic,copy) NSString *topic;
// 负荷数据
Expand Down
4 changes: 4 additions & 0 deletions WuKongIMSDK/Classes/coder/proto/WKRecvPacket.m
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ -(WKPacket*) decodeLM:(NSData*) body header:(WKHeader*)header {
packet.fromUid = [reader readString];
packet.channelId = [reader readString];
packet.channelType = [reader readUint8];
if(WKSDK.shared.options.protoVersion>=3) {
packet.expire = [reader readUint32];
}

packet.clientMsgNo = [reader readString];
if(packet.setting.streamOn) {
packet.streamNo = [reader readString];
Expand Down
2 changes: 2 additions & 0 deletions WuKongIMSDK/Classes/coder/proto/WKSendPacket.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ NS_ASSUME_NONNULL_BEGIN

@property(nonatomic,strong) WKSetting *setting;

@property(nonatomic,assign) NSInteger expire; // 消息过期时长

// 客户端提供的序列号,在客户端内唯一
@property(nonatomic,assign) uint32_t clientSeq;

Expand Down
6 changes: 5 additions & 1 deletion WuKongIMSDK/Classes/coder/proto/WKSendPacket.m
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ -(NSData*) encodeLM:(WKSendPacket*)packet{
[writer writeVariableString:packet.channelId];
// 频道类型
[writer writeUint8:packet.channelType];

if(WKSDK.shared.options.protoVersion>=3) {
// expire
[writer writeUint32:(uint32_t)packet.expire];
}

NSString *signStr = [packet veritifyString];
NSString *msgKey = [[WKSecurityManager shared] encryption:signStr];
[writer writeVariableString:[[WKSecurityManager shared] md5:msgKey]];
Expand Down
5 changes: 5 additions & 0 deletions WuKongIMSDK/Classes/db/WKMessageDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ NS_ASSUME_NONNULL_BEGIN
*/
-(void) deleteMessage:(WKMessage*)message;

-(void) deleteMessagesWithClientSeqs:(NSArray<NSNumber*>*)ids;


/**
彻底将消息从数据库删除 (deleteMessage只是标记为删除)
Expand Down Expand Up @@ -296,6 +298,9 @@ NS_ASSUME_NONNULL_BEGIN
// 获取流
-(NSArray<WKStream*>*) getStreams:(NSString*)streamNo;

// 获取过期消息
-(NSArray<WKMessage*>*) getExpireMessages:(NSInteger)limit;

@end

NS_ASSUME_NONNULL_END
50 changes: 46 additions & 4 deletions WuKongIMSDK/Classes/db/WKMessageDB.m
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#import "WKReactionDB.h"
#import "WKUnknownContent.h"
// 保存消息
#define SQL_MESSAGE_SAVE [NSString stringWithFormat:@"insert into %@(message_id,message_seq,order_seq,client_msg_no,stream_no,timestamp,from_uid,to_uid,channel_id,channel_type,content_type,content,searchable_word,voice_readed,status,reason_code,extra,setting,flame,flame_second,viewed,viewed_at,is_deleted) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",TB_MESSAGE]
#define SQL_MESSAGE_SAVE [NSString stringWithFormat:@"insert into %@(message_id,message_seq,order_seq,client_msg_no,stream_no,timestamp,from_uid,to_uid,channel_id,channel_type,content_type,content,searchable_word,voice_readed,status,reason_code,extra,setting,flame,flame_second,viewed,viewed_at,expire,expire_at,is_deleted) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",TB_MESSAGE]

// 保存或更新消息
//#define SQL_MESSAGE_REPLACE [NSString stringWithFormat:@"insert into %@(message_id,message_seq,order_seq,client_msg_no,timestamp,from_uid,to_uid,channel_id,channel_type,content_type,content,searchable_word,voice_readed,status,extra,revoke,is_deleted) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT(client_msg_no) DO UPDATE SET voice_readed=excluded.voice_readed,status=excluded.status",TB_MESSAGE]
Expand Down Expand Up @@ -84,6 +84,8 @@
// 删除指定id的消息
#define SQL_MESSAGE_DELETE_CLIENT_SEQ [NSString stringWithFormat:@"update %@ set is_deleted=1 where id=?",TB_MESSAGE]

#define SQL_MESSAGE_DELETE_CLIENT_SEQS [NSString stringWithFormat:@"update %@ set is_deleted=1 where id in ",TB_MESSAGE]

// 彻底删除消息
#define SQL_MESSAGE_DESTORY_ID [NSString stringWithFormat:@"delete from %@ where id=?",TB_MESSAGE]

Expand Down Expand Up @@ -162,6 +164,10 @@
// 查询流
#define SQL_STREAM_WITH_STREAM_NO [NSString stringWithFormat:@"select * from %@ where stream_no=? order by stream_seq asc",TB_STREAM]

// 查询过期消息
#define SQL_EXPIRE_MESSAGES [NSString stringWithFormat:@"select message.*,%@ from %@ left join message_extra on message.message_id=message_extra.message_id where message.is_deleted=0 and message.expire_at<>0 and message.expire_at <= ? order by order_seq asc limit 0,?",SQL_EXTRA_COLS,TB_MESSAGE]



@implementation WKMessageDB

Expand Down Expand Up @@ -215,7 +221,11 @@ + (WKMessageDB *)shared
}else{
orderSeq = [self getMaxOrderSeqWithChannel:db channel:message.channel]+1;
}
bool success = [db executeUpdate:SQL_MESSAGE_SAVE,@(message.messageId),@(message.messageSeq),@(orderSeq),message.clientMsgNo?:@"",message.streamNo?:@"",@(message.timestamp),message.fromUid?:@"",message.toUid?:@"",message.channel.channelId?:@"",@(message.channel.channelType),@(message.contentType),message.contentData?:@"",searchableWord?:@"",@(message.voiceReaded),@(message.status),@(message.reasonCode),[self extraToStr:message.extra],@([message.setting toUint8]),@(message.content.flame),@(message.content.flameSecond),@(message.viewed),@(message.viewedAt),@(message.isDeleted)];
NSInteger expireAt = 0;
if(message.expireAt) {
expireAt = [message.expireAt timeIntervalSince1970];
}
bool success = [db executeUpdate:SQL_MESSAGE_SAVE,@(message.messageId),@(message.messageSeq),@(orderSeq),message.clientMsgNo?:@"",message.streamNo?:@"",@(message.timestamp),message.fromUid?:@"",message.toUid?:@"",message.channel.channelId?:@"",@(message.channel.channelType),@(message.contentType),message.contentData?:@"",searchableWord?:@"",@(message.voiceReaded),@(message.status),@(message.reasonCode),[self extraToStr:message.extra],@([message.setting toUint8]),@(message.content.flame),@(message.content.flameSecond),@(message.viewed),@(message.viewedAt),@(message.expire),@(expireAt),@(message.isDeleted)];

if(success) {
message.clientSeq = (uint32_t)db.lastInsertRowId;
Expand Down Expand Up @@ -290,8 +300,11 @@ -(BOOL) insertMessage:(WKMessage*)message db:(FMDatabase*)db clientMsgNo:(NSStri
if(message.messageSeq!=0) {
orderSeq = message.messageSeq*WKOrderSeqFactor;
}

return [db executeUpdate:SQL_MESSAGE_SAVE,@(message.messageId),@(message.messageSeq),@(orderSeq),clientMsgNo,message.streamNo?:@"",@(message.timestamp),message.fromUid?:@"",message.toUid?:@"",message.channel.channelId?:@"",@(message.channel.channelType),@(message.contentType),message.contentData?:@"",searchableWord?:@"",@(message.voiceReaded),@(message.status),@(message.reasonCode),[self extraToStr:message.extra],@([message.setting toUint8]),@(message.content.flame),@(message.content.flameSecond),@(message.viewed),@(message.viewedAt),@(isDeleted)];
NSInteger expireAt = 0;
if(message.expireAt) {
expireAt = [message.expireAt timeIntervalSince1970];
}
return [db executeUpdate:SQL_MESSAGE_SAVE,@(message.messageId),@(message.messageSeq),@(orderSeq),clientMsgNo,message.streamNo?:@"",@(message.timestamp),message.fromUid?:@"",message.toUid?:@"",message.channel.channelId?:@"",@(message.channel.channelType),@(message.contentType),message.contentData?:@"",searchableWord?:@"",@(message.voiceReaded),@(message.status),@(message.reasonCode),[self extraToStr:message.extra],@([message.setting toUint8]),@(message.content.flame),@(message.content.flameSecond),@(message.viewed),@(message.viewedAt),@(message.expire),@(expireAt),@(isDeleted)];
}

-(BOOL) existMessage:(uint64_t)messageId db:(FMDatabase*)db{
Expand Down Expand Up @@ -750,6 +763,13 @@ - (void)deleteMessage:(WKMessage *)message {
}];
}

-(void) deleteMessagesWithClientSeqs:(NSArray<NSNumber*>*)ids {
[WKDB.sharedDB.dbQueue inDatabase:^(FMDatabase * _Nonnull db) {
NSString *idStrs = [ids componentsJoinedByString:@","];
[db executeUpdate:[NSString stringWithFormat:@"%@ (%@)",SQL_MESSAGE_DELETE_CLIENT_SEQS,idStrs]];
}];
}

- (void)destoryMessage:(WKMessage *)message {

[[WKDB sharedDB].dbQueue inDatabase:^(FMDatabase * _Nonnull db) {
Expand Down Expand Up @@ -840,6 +860,21 @@ -(void) updateMessageUploadingToFailStatus{
return messages;
}

-(NSArray<WKMessage*>*) getExpireMessages:(NSInteger)limit {
__block NSMutableArray *messages = [NSMutableArray array];
[WKDB.sharedDB.dbQueue inDatabase:^(FMDatabase * _Nonnull db) {
NSTimeInterval nowInterval = [[NSDate date] timeIntervalSince1970];
FMResultSet *resultSet = [db executeQuery:SQL_EXPIRE_MESSAGES,@(nowInterval),@(limit)];
while(resultSet.next) {
NSDictionary *resultDic = resultSet.resultDictionary;
[messages addObject:[self toMessage:resultDic db:db]];
}
[resultSet close];
}];
return messages;
}


-(void) updateMessageStatus:(WKMessageStatus)status withClientSeq:(uint32_t)clientSeq {
[[WKDB sharedDB].dbQueue inDatabase:^(FMDatabase * _Nonnull db) {
[db executeUpdate:SQL_MESSAGE_UPDATE_STATUS,@(status),@(clientSeq)];
Expand Down Expand Up @@ -987,6 +1022,13 @@ -(WKMessage*) toMessage:(NSDictionary*)dict db:(FMDatabase*)db{
message.localTimestamp = [[self dateFromString:dict[@"created_at"]] timeIntervalSince1970];
message.fromUid = dict[@"from_uid"];
message.toUid = dict[@"to_uid"];
if(dict[@"expire"]) {
message.expire = [dict[@"expire"] integerValue];
}
if(dict[@"expire_at"]) {
NSInteger expireAt = [dict[@"expire_at"] integerValue];
message.expireAt = [NSDate dateWithTimeIntervalSince1970:expireAt];
}
message.channel = [[WKChannel alloc] initWith:dict[@"channel_id"] channelType:[dict[@"channel_type"] integerValue]];
if(dict[@"parent_channel_id"] && ![dict[@"parent_channel_id"] isEqualToString:@""]) {
message.parentChannel = [[WKChannel alloc] initWith:dict[@"parent_channel_id"] channelType:[dict[@"parent_channel_type"] integerValue]];
Expand Down
5 changes: 5 additions & 0 deletions WuKongIMSDK/Classes/manager/WKChatManager.m
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ -(WKMessage*) saveMessage:(WKMessageContent*)content channel:(WKChannel*)channel
message.status = status;
if(setting) {
message.setting = setting;
message.expire = setting.expire;
if(setting.expire>0) {
message.expireAt = [[NSDate date] dateByAddingTimeInterval:setting.expire];
}
}
if(topic && ![topic isEqualToString:@""]) {
message.topic = topic;
Expand Down Expand Up @@ -264,6 +268,7 @@ -(WKMessage*) sendMessage:(WKMessage*)message addRetryQueue:(BOOL)addRetryQueue{
sendPacket.clientMsgNo = message.clientMsgNo;
sendPacket.channelId = message.channel.channelId;
sendPacket.channelType = message.channel.channelType;
sendPacket.expire = message.expire;
sendPacket.topic = message.topic;
sendPacket.payload = message.content.encode;

Expand Down
57 changes: 43 additions & 14 deletions WuKongIMSDK/Classes/manager/WKRetryManager.m
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ @interface WKRetryManager ()
@property(nonatomic,strong) NSTimer *retryTimer;
@property(nonatomic,strong) NSTimer *messageExtraRetryTimer;
@property(nonatomic,strong) NSTimer *reminderRetryTimer;
@property(nonatomic,strong) NSTimer *expireMsgCheckTimer;

@property(nonatomic,assign) BOOL started; // 是否已开始

@end
@implementation WKRetryManager
Expand Down Expand Up @@ -102,6 +105,10 @@ -(NSString*) getKey:(WKMessage*)message {
}

-(void) start {
if(self.started) {
return;
}
self.started = true;
__weak typeof(self) weakSelf = self;
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{
[weakSelf startMessageRetry];
Expand All @@ -112,6 +119,42 @@ -(void) start {
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{
[weakSelf startReminderRetry];
});

dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0), ^{
[weakSelf startReminderRetry];
});

self.expireMsgCheckTimer = [NSTimer scheduledTimerWithTimeInterval:WKSDK.shared.options.expireMsgCheckInterval target:self selector:@selector(startExpireMsgCheck) userInfo:nil repeats:YES];
}

-(void) stop {
self.started = false;
if(self.retryTimer) {
[self.retryTimer invalidate];
self.retryTimer = nil;
}
if(self.messageExtraRetryTimer) {
[self.messageExtraRetryTimer invalidate];
self.messageExtraRetryTimer = nil;
}
if(self.reminderRetryTimer) {
[self.reminderRetryTimer invalidate];
self.reminderRetryTimer = nil;
}
if(self.expireMsgCheckTimer) {
[self.expireMsgCheckTimer invalidate];
self.expireMsgCheckTimer = nil;
}
}

-(void) startExpireMsgCheck {
NSArray<WKMessage*> *messages = [WKMessageDB.shared getExpireMessages:WKSDK.shared.options.expireMsgLimit];
if(messages && messages.count>0) {
for (WKMessage *message in messages) {
[WKSDK.shared.chatManager deleteMessage:message];
}

}
}

-(void) startMessageRetry {
Expand Down Expand Up @@ -223,20 +266,6 @@ -(void) startReminderRetry {
});
}

-(void) stop {
if(self.retryTimer) {
[self.retryTimer invalidate];
self.retryTimer = nil;
}
if(self.messageExtraRetryTimer) {
[self.messageExtraRetryTimer invalidate];
self.messageExtraRetryTimer = nil;
}
if(self.reminderRetryTimer) {
[self.reminderRetryTimer invalidate];
self.reminderRetryTimer = nil;
}
}

-(void) reminderUpload {
[self.retryLock lock];
Expand Down
2 changes: 2 additions & 0 deletions WuKongIMSDK/Classes/model/WKMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ NS_ASSUME_NONNULL_BEGIN
@property(nonatomic,strong) NSMutableArray<WKStream*> *streams; // 流式消息内容
@property(nonatomic,assign) BOOL streamOn; // 是否开启了stream

@property(nonatomic,assign) NSInteger expire; // 过期时长(单位为秒)

@property(nonatomic,strong) NSDate *expireAt; // 过期时间

@end

Expand Down

0 comments on commit 8adcc18

Please sign in to comment.