Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 53 additions & 57 deletions src/composables/useGetMessages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ function isAxiosErrorResponse(exception: unknown): exception is AxiosError<strin
let pollingTimeout: NodeJS.Timeout | undefined
let expirationInterval: NodeJS.Timeout | undefined
let pollingErrorTimeout = 1_000
let chatRelaySupported = false
let chatRelaySupported: boolean | null = null

/**
* Composable to provide control logic for fetching messages list
Expand All @@ -78,7 +78,6 @@ export function useGetMessagesProvider() {
const loadingNewMessages = ref(false)
const isInitialisingMessages = ref(true)
const stopFetchingOldMessages = ref(false)
let chatRelayEnabled = false

/**
* Returns whether the current participant is a participant of current conversation.
Expand Down Expand Up @@ -149,14 +148,13 @@ export function useGetMessagesProvider() {
}
if (oldToken && oldToken !== newToken) {
store.dispatch('cancelPollNewMessages', { requestId: oldToken })
stopChatRelay()
chatRelaySupported = null
}

if (newToken && canGetMessages) {
handleStartGettingMessagesPreconditions(newToken)
} else {
store.dispatch('cancelPollNewMessages', { requestId: newToken })
stopChatRelay()
}

/** Remove expired messages when joining a room */
Expand All @@ -169,8 +167,11 @@ export function useGetMessagesProvider() {
subscribe('networkOnline', handleNetworkOnline)
EventBus.on('route-change', onRouteChange)
EventBus.on('set-context-id-to-bottom', setContextIdToBottom)
EventBus.on('signaling-supported-features', checkChatRelaySupport)
EventBus.on('should-refresh-chat-messages', tryAbortChatRelay)
if (experimentalChatRelay) {
EventBus.on('signaling-message-received', addMessageFromChatRelay)
EventBus.on('signaling-supported-features', checkChatRelaySupport)
EventBus.on('should-refresh-chat-messages', tryPollNewMessages)
}

/** Every 30 seconds we remove expired messages from the store */
expirationInterval = setInterval(() => {
Expand All @@ -184,10 +185,9 @@ export function useGetMessagesProvider() {
EventBus.off('set-context-id-to-bottom', setContextIdToBottom)
EventBus.off('signaling-message-received', addMessageFromChatRelay)
EventBus.off('signaling-supported-features', checkChatRelaySupport)
EventBus.off('should-refresh-chat-messages', tryAbortChatRelay)
EventBus.off('should-refresh-chat-messages', tryPollNewMessages)

store.dispatch('cancelPollNewMessages', { requestId: currentToken.value })
stopChatRelay()
clearInterval(pollingTimeout)
clearInterval(expirationInterval)
})
Expand All @@ -208,7 +208,6 @@ export function useGetMessagesProvider() {
if (currentToken.value) {
console.debug('Canceling message request as we are offline')
store.dispatch('cancelPollNewMessages', { requestId: currentToken.value })
stopChatRelay()
}
}

Expand Down Expand Up @@ -280,7 +279,7 @@ export function useGetMessagesProvider() {
messageId = nearestContextMessageId
}

if (messageId === firstContextMessageId) {
if (messageId === firstContextMessageId || !chatStore.hasEnoughMessages(token, { messageId, threadId })) {
// message is the first one in the block, try to get some messages above
isInitialisingMessages.value = true
await getOldMessages(token, false, { messageId, threadId })
Expand Down Expand Up @@ -349,8 +348,22 @@ export function useGetMessagesProvider() {

isInitialisingMessages.value = false

// Once the history is received, starts looking for new messages.
await pollNewMessages(token)
if (!experimentalChatRelay) {
pollNewMessages(token)
} else if (chatRelaySupported !== null) {
// Case: chat relay is confirmed to be supported / not supported from signaling hello message,
// but polling was not immediately triggered (e.g, when received while context request is ongoing)
pollNewMessages(token)
} else {
// Fallback polling in case signaling does not work and we will never receive Hello message
// chatRelaySupported is still null (signaling hello was not received yet)
pollingTimeout = setTimeout(() => {
if (chatRelaySupported) {
return
}
pollNewMessages(token)
}, 30_000)
}
}

/**
Expand Down Expand Up @@ -496,10 +509,6 @@ export function useGetMessagesProvider() {
* @param token token of conversation where a method was called
*/
async function pollNewMessages(token: string) {
if (chatRelayEnabled) {
// Stop polling if chat relay is supported
return
}
// Check that the token has not changed
if (currentToken.value !== token) {
console.debug(`token has changed to ${currentToken.value}, breaking the loop for ${token}`)
Expand All @@ -514,10 +523,10 @@ export function useGetMessagesProvider() {
token,
lastKnownMessageId: chatStore.getLastKnownId(token),
requestId: token,
timeout: chatRelaySupported ? 0 : undefined,
})
pollingErrorTimeout = 1_000
debugTimer.end(`${token} | long polling`, 'status 200')
tryChatRelay()
} catch (exception) {
if (Axios.isCancel(exception)) {
debugTimer.end(`${token} | long polling`, 'cancelled')
Expand All @@ -531,7 +540,9 @@ export function useGetMessagesProvider() {
// This is not an error, so reset error timeout and poll again
pollingErrorTimeout = 1_000
clearTimeout(pollingTimeout)
tryChatRelay({ force: true })
if (chatRelaySupported) {
return
}
pollingTimeout = setTimeout(() => {
pollNewMessages(token)
}, 500)
Expand All @@ -547,54 +558,54 @@ export function useGetMessagesProvider() {
console.debug('Error happened while getting chat messages. Trying again in %d seconds', pollingErrorTimeout / 1_000, exception)

clearTimeout(pollingTimeout)
if (chatRelaySupported) {
return
}
pollingTimeout = setTimeout(() => {
pollNewMessages(token)
}, pollingErrorTimeout)
return
}

clearTimeout(pollingTimeout)
if (chatRelaySupported) {
return
}
pollingTimeout = setTimeout(() => {
pollNewMessages(token)
}, 500)
}

/**
* Try to start chat relay
*
* @param options
* @param options.force - to skip end reached check when it is guaranteed
*/
function tryChatRelay(options?: { force: boolean }) {
if (chatRelaySupported && (isChatEndReached.value || options?.force)) {
startChatRelay()
function tryPollNewMessages() {
if (!chatRelaySupported) {
// the event is only relevant when chat relay is supported
return
}
pollNewMessages(currentToken.value)
}

/**
* Check whether chat relay is supported
*
* @param features
*/
function checkChatRelaySupport(features: string[]) {
if (experimentalChatRelay && features.includes('chat-relay')) {
async function checkChatRelaySupport(features: string[]) {
if (features.includes('chat-relay')) {
chatRelaySupported = true
tryChatRelay()
} else {
chatRelaySupported = false
}
}

/**
* Initialize chat relay support by stopping polling and listening to chat relay messages
*/
function startChatRelay() {
if (currentToken.value) {
// it might have been set already, ensure we cancel it
store.dispatch('cancelPollNewMessages', { requestId: currentToken.value })
if (!pollingTimeout) {
// Context request is still ongoing
return
}
chatRelayEnabled = true
EventBus.on('signaling-message-received', addMessageFromChatRelay)
// Once the history and Hello signaling message is received, starts looking for new messages.
clearTimeout(pollingTimeout)
await pollNewMessages(currentToken.value)
}

/**
Expand All @@ -605,6 +616,11 @@ export function useGetMessagesProvider() {
* @param payload.message
*/
function addMessageFromChatRelay(payload: { token: string, message: ChatMessage }) {
if (!chatRelaySupported) {
// chat relay is not supported, ignore the message
return
}

const { token, message } = payload
if (token !== currentToken.value) {
// Guard: Message is for another conversation
Expand All @@ -616,26 +632,6 @@ export function useGetMessagesProvider() {
store.dispatch('processMessage', { token, message })
}

/**
* Stop chat relay and remove listener
*/
function stopChatRelay() {
chatRelayEnabled = false
EventBus.off('signaling-message-received', addMessageFromChatRelay)
}

/**
* This is needed when something went wrong after starting chat relay
* and the server is no longer sending us messages events
* so we need to abort it to continue getting messages via polling
*/
function tryAbortChatRelay() {
if (chatRelayEnabled && chatRelaySupported) {
stopChatRelay()
pollNewMessages(currentToken.value)
}
}

provide(GET_MESSAGES_CONTEXT_KEY, {
contextMessageId,
loadingOldMessages,
Expand Down
3 changes: 3 additions & 0 deletions src/services/messagesService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,14 @@ async function fetchMessages({
* @param data.lastKnownMessageId The id of the last message in the store.
* @param data.token The conversation token;
* @param [data.limit] Number of messages to load
* @param data.timeout Timeout duration for long polling
* @param [options] Axios request options
*/
async function pollNewMessages({
token,
lastKnownMessageId,
limit = 100,
timeout,
}: ReceiveMessagesPayload, options?: AxiosRequestConfig): receiveMessagesResponse {
return axios.get(generateOcsUrl('apps/spreed/api/v1/chat/{token}', { token }), {
...options,
Expand All @@ -103,6 +105,7 @@ async function pollNewMessages({
limit,
includeLastKnown: 0,
markNotificationsAsRead: 0,
timeout,
} as receiveMessagesParams,
})
}
Expand Down
4 changes: 3 additions & 1 deletion src/store/messagesStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -1046,8 +1046,9 @@ const actions = {
* @param {string} data.requestId id to identify request uniquely
* @param {object} data.requestOptions request options;
* @param {number} data.lastKnownMessageId The id of the last message in the store.
* @param data.timeout
*/
async pollNewMessages(context, { token, lastKnownMessageId, requestId, requestOptions }) {
async pollNewMessages(context, { token, lastKnownMessageId, requestId, timeout, requestOptions }) {
const actorStore = useActorStore()
context.dispatch('cancelPollNewMessages', { requestId })

Expand All @@ -1067,6 +1068,7 @@ const actions = {
token,
lastKnownMessageId,
limit: CHAT.FETCH_LIMIT,
timeout,
}, requestOptions)
context.commit('setCancelPollNewMessages', { requestId })

Expand Down
35 changes: 35 additions & 0 deletions src/stores/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,40 @@ export const useChatStore = defineStore('chat', () => {
return Math.min(...filterNumericIds(contextBlock))
}

/**
*
* Check whether there are enough messages to render in the selected context, in particular
* when there are other blocks, it means there is likely more history to load
*
* @param token The conversation token
* @param data The data object containing messageId and threadId
* @param data.messageId The message id
* @param data.threadId The thread id
*/
function hasEnoughMessages(
token: string,
{ messageId = 0, threadId = 0 }: GetMessagesListOptions = { messageId: 0, threadId: 0 },
): boolean {
let contextBlock: Set<number>
const numBlocks = (threadId ? threadBlocks[token][threadId]?.length : chatBlocks[token]?.length) ?? 0
if (numBlocks <= 1) {
// If only one block, we cannot assume there is more history to load
return true
}

if (threadId) {
contextBlock = (messageId <= 0)
? threadBlocks[token][threadId][0]
: threadBlocks[token][threadId].find((set) => set.has(messageId)) ?? threadBlocks[token][threadId][0]
} else {
contextBlock = (messageId <= 0)
? chatBlocks[token][0]
: chatBlocks[token].find((set) => set.has(messageId)) ?? chatBlocks[token][0]
}

return contextBlock.size > 10
}

/**
* Returns last known message id, belonging to current context. Defaults to given messageId
*
Expand Down Expand Up @@ -554,5 +588,6 @@ export const useChatStore = defineStore('chat', () => {
removeMessagesFromChatBlocks,
clearMessagesHistory,
purgeChatStore,
hasEnoughMessages,
}
})
5 changes: 4 additions & 1 deletion src/utils/signaling.js
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,8 @@ Signaling.Internal.prototype._joinRoomSuccess = function(token, sessionId) {
this.sessionId = sessionId
this._trigger('sessionId', [this.sessionId])
this._startPullingMessages()
// Event needed to inform about chat relay support
this._trigger('supportedFeatures', [])
}

Signaling.Internal.prototype._doLeaveRoom = function(token) {
Expand Down Expand Up @@ -1113,7 +1115,6 @@ Signaling.Standalone.prototype.helloResponseReceived = function(data) {
for (i = 0; i < features.length; i++) {
this.features[features[i]] = true
}
this._trigger('supportedFeatures', features)
}

if (!this.settings.helloAuthParams.internal
Expand Down Expand Up @@ -1306,6 +1307,8 @@ Signaling.Standalone.prototype.joinResponseReceived = function(data, token) {
})
this.roomCollection.sort()
}

this._trigger('supportedFeatures', Object.keys(this.features))
}

Signaling.Standalone.prototype._doLeaveRoom = function(token) {
Expand Down
Loading