File tree Expand file tree Collapse file tree 5 files changed +46
-14
lines changed
lib/src/main/kotlin/com/wire/integrations/jvm Expand file tree Collapse file tree 5 files changed +46
-14
lines changed Original file line number Diff line number Diff line change @@ -30,8 +30,11 @@ import com.wire.integrations.jvm.model.http.client.RegisterClientResponse
30
30
import com.wire.integrations.jvm.model.http.conversation.ConversationResponse
31
31
import com.wire.integrations.jvm.model.http.user.UserResponse
32
32
import io.ktor.client.plugins.websocket.DefaultClientWebSocketSession
33
+ import java.util.UUID
33
34
34
35
interface BackendClient {
36
+ fun getCurrentSyncMarker (): UUID ?
37
+
35
38
suspend fun connectWebSocket (handleFrames : suspend (DefaultClientWebSocketSession ) -> Unit )
36
39
37
40
suspend fun getAvailableApiVersions (): ApiVersionResponse
Original file line number Diff line number Diff line change @@ -81,17 +81,29 @@ internal class BackendClientDemo(
81
81
private var cachedFeatures: FeaturesResponse ? = null
82
82
private var cachedAccessToken: String? = null
83
83
private var cachedDeviceId: String? = null
84
+ private var currentSyncMarker: UUID ? = null
85
+
86
+ override fun getCurrentSyncMarker (): UUID ? = currentSyncMarker
84
87
85
88
override suspend fun connectWebSocket (
86
89
handleFrames : suspend (DefaultClientWebSocketSession ) -> Unit
87
90
) {
88
91
logger.info(" Connecting to the webSocket, waiting for events" )
89
92
93
+ currentSyncMarker = UUID .randomUUID()
90
94
val token = loginUser()
95
+
96
+ val url = StringBuilder ().apply {
97
+ append(" /$API_VERSION /events" )
98
+ append(" ?client=$cachedDeviceId " )
99
+ append(" &access_token=$token " )
100
+ append(" &sync_marker=$currentSyncMarker " )
101
+ }.toString()
102
+
91
103
httpClient.wss(
92
104
host = IsolatedKoinContext .getApiHost()?.replace(" https://" , " " )
93
105
?.replace(" -https" , " -ssl" ),
94
- path = " / $API_VERSION /events?client= $cachedDeviceId &access_token= $token "
106
+ path = url
95
107
) {
96
108
handleFrames(this )
97
109
}
Original file line number Diff line number Diff line change @@ -39,6 +39,7 @@ import io.ktor.client.request.get
39
39
import io.ktor.client.request.header
40
40
import io.ktor.client.request.post
41
41
import io.ktor.http.HttpHeaders
42
+ import java.util.UUID
42
43
import org.slf4j.LoggerFactory
43
44
44
45
/* *
@@ -47,6 +48,8 @@ import org.slf4j.LoggerFactory
47
48
internal class BackendClientImpl (private val httpClient : HttpClient ) : BackendClient {
48
49
private val logger = LoggerFactory .getLogger(this ::class .java)
49
50
51
+ override fun getCurrentSyncMarker (): UUID ? = null
52
+
50
53
override suspend fun connectWebSocket (
51
54
handleFrames : suspend (DefaultClientWebSocketSession ) -> Unit
52
55
) {
Original file line number Diff line number Diff line change @@ -30,15 +30,15 @@ sealed class ConsumableNotificationResponse {
30
30
@SerialName(" data" ) val data : EventDataDTO
31
31
) : ConsumableNotificationResponse()
32
32
33
- @Serializable
34
- @SerialName(" message_count" )
35
- data class MessageCount (
36
- @SerialName(" data" ) val data : NotificationCount
37
- ) : ConsumableNotificationResponse()
38
-
39
33
@Serializable
40
34
@SerialName(" notifications_missed" )
41
35
data object MissedNotification : ConsumableNotificationResponse ()
36
+
37
+ @Serializable
38
+ @SerialName(" synchronization" )
39
+ data class SynchronizationNotification (
40
+ @SerialName(" data" ) val data : SynchronizationDataDTO
41
+ ) : ConsumableNotificationResponse()
42
42
}
43
43
44
44
@Serializable
@@ -50,7 +50,9 @@ data class EventDataDTO(
50
50
)
51
51
52
52
@Serializable
53
- data class NotificationCount (
54
- @SerialName(" count" )
55
- val count : ULong
53
+ data class SynchronizationDataDTO (
54
+ @SerialName(" delivery_tag" )
55
+ val deliveryTag : ULong? ,
56
+ @SerialName(" marker_id" )
57
+ val markerId : String
56
58
)
Original file line number Diff line number Diff line change @@ -89,15 +89,27 @@ internal class WireTeamEventsListener internal constructor(
89
89
}
90
90
}
91
91
92
- is ConsumableNotificationResponse .MessageCount -> {
93
- logger.info(" Websocket back online, ${notification.data.count} events to fetch" )
94
- }
95
-
96
92
is ConsumableNotificationResponse .MissedNotification -> {
97
93
logger.warn(" App was offline for too long, missed some notifications" )
98
94
val ackRequest = EventAcknowledgeRequest .notificationMissedAck()
99
95
ackEvent(ackRequest, session)
100
96
}
97
+
98
+ is ConsumableNotificationResponse .SynchronizationNotification -> {
99
+ notification.data.deliveryTag?.let { deliveryTag ->
100
+ val ackRequest = EventAcknowledgeRequest .basicAck(deliveryTag)
101
+ ackEvent(ackRequest, session)
102
+ }
103
+ val currentSyncMarker = backendClient.getCurrentSyncMarker()
104
+ if (notification.data.markerId == currentSyncMarker.toString()) {
105
+ logger.info(" Notifications are up to date since last sync marker." )
106
+ } else {
107
+ logger.debug(
108
+ " Skipping sync marker [${notification.data.markerId} ], " +
109
+ " as it is not valid for this session."
110
+ )
111
+ }
112
+ }
101
113
}
102
114
}
103
115
You can’t perform that action at this time.
0 commit comments