Skip to content
11 changes: 0 additions & 11 deletions docs/llm/modules/HELPERS_MODULE.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,6 @@ Converts array to query string
- **arr**: `Record<string, unknown>[]`
- **Returns**: `string`

### `getQueryBatchCursorFromTime(fromUnixTimeInMs, toUnixTimeInMs?)`
Generates batch query cursor from timestamps
- **fromUnixTimeInMs**: `number`
- **toUnixTimeInMs**: `number`
- **Returns**: Cursor string

### `getQueryModifiedCursorFromTime(unixTimeInMs)`
Generates modified query cursor from timestamp
- **unixTimeInMs**: `number`
- **Returns**: Cursor string

### `tryJsonParse<T>(json, onError?)`
Safely parses JSON with error handling
- **json**: `string`
Expand Down
126 changes: 62 additions & 64 deletions packages/apps/chat-app/src/hooks/chat/live/useChatInboxProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
DeletedHomebaseFile,
} from '@homebase-id/js-lib/core';
import {
getQueryBatchCursorFromTime,
hasDebugFlag,
} from '@homebase-id/js-lib/helpers';
import { processInbox } from '@homebase-id/js-lib/peer';
import { useQueryClient, useQuery, QueryClient } from '@tanstack/react-query';
import { useEffect } from 'react';
import { CHAT_MESSAGE_FILE_TYPE, dsrToMessage, ChatMessage } from '../../../providers/ChatProvider';
import {
ChatDrive,
Expand All @@ -20,106 +20,104 @@
} from '../../../providers/ConversationProvider';
import { insertNewConversation, invalidateConversations } from '../useConversations';
import { processChatMessagesBatch } from './useChatWebsocket';
import { invalidateChatMessages } from '../useChatMessages';

Check warning on line 23 in packages/apps/chat-app/src/hooks/chat/live/useChatInboxProcessor.ts

View workflow job for this annotation

GitHub Actions / build

'invalidateChatMessages' is defined but never used

const isDebug = hasDebugFlag();

const MINUTE_IN_MS = 60000;
const BATCH_SIZE = 2000;
// Process the inbox on startup
export const useChatInboxProcessor = (connected?: boolean) => {
const dotYouClient = useDotYouClientContext();
const queryClient = useQueryClient();

useEffect(() => {
console.log('[ChatInboxProcessor] mounted at', new Date().toISOString());
return () => console.log('[ChatInboxProcessor] unmounted at', new Date().toISOString());
}, []);

const fetchData = async () => {
const lastProcessedTime = queryClient.getQueryState(['process-chat-inbox'])?.dataUpdatedAt;
const lastProcessedWithBuffer = lastProcessedTime && lastProcessedTime - MINUTE_IN_MS * 2;
isDebug && console.log('[ChatInboxProcessor] fetchData called with cursor:', queryClient.getQueryData(['cursor-chat-inbox']), 'at', new Date().toISOString());
const lastCursor = queryClient.getQueryData(['cursor-chat-inbox']);
const shouldInvalidate = queryClient.getQueryData(['cursor-chat-inbox']) === undefined;

Check warning on line 41 in packages/apps/chat-app/src/hooks/chat/live/useChatInboxProcessor.ts

View workflow job for this annotation

GitHub Actions / build

'shouldInvalidate' is assigned a value but never used
const cursor = typeof lastCursor === 'string' ? lastCursor : null;

const processedresult = await processInbox(dotYouClient, ChatDrive, BATCH_SIZE);

Check warning on line 44 in packages/apps/chat-app/src/hooks/chat/live/useChatInboxProcessor.ts

View workflow job for this annotation

GitHub Actions / build

'processedresult' is assigned a value but never used
isDebug && console.debug('[InboxProcessor] fetching updates since', lastProcessedWithBuffer);
if (lastProcessedWithBuffer) {
const updatedMessages = await findChangesSinceTimestamp(
dotYouClient,
lastProcessedWithBuffer,
{
targetDrive: ChatDrive,
fileType: [CHAT_MESSAGE_FILE_TYPE],
}
);
isDebug && console.debug('[InboxProcessor] new messages', updatedMessages.length);
if (updatedMessages.length > 0) {
const fullMessages = (
await Promise.all(
updatedMessages.map(
async (msg) =>
await dsrToMessage(
dotYouClient,
msg as unknown as HomebaseFile<string>,
ChatDrive,
false
)
)
)
).filter(Boolean) as HomebaseFile<ChatMessage>[];
await processChatMessagesBatch(dotYouClient, queryClient, fullMessages);
}
isDebug && console.debug('[InboxProcessor] fetching updates since', cursor);

const updatedConversations = await findChangesSinceTimestamp(
dotYouClient,
lastProcessedWithBuffer,
{
targetDrive: ChatDrive,
fileType: [CHAT_CONVERSATION_FILE_TYPE],
}
);
isDebug && console.debug('[InboxProcessor] new conversations', updatedConversations.length);
await processConversationsBatch(dotYouClient, queryClient, updatedConversations);
} else {
console.warn('[useChatInboxProcessor] Invalidating all conversations & chat messages');
// We have no reference to the last time we processed the inbox, so we can only invalidate all chat messages
invalidateChatMessages(queryClient);
invalidateConversations(queryClient);
// if (shouldInvalidate)
// {
// console.warn('[useChatInboxProcessor] Invalidating all conversations & chat messages');
// // We have no reference to the last time we processed the inbox, so we can only invalidate all chat messages
// invalidateChatMessages(queryClient);
// invalidateConversations(queryClient);
// }

const updatedMessagesResult = await findChangesSinceTimestamp(
dotYouClient,
cursor,
{
targetDrive: ChatDrive,
fileType: [CHAT_MESSAGE_FILE_TYPE],
}
);
const updatedMessages = updatedMessagesResult.searchResults;
isDebug && console.debug('[InboxProcessor] new messages', updatedMessages.length);
if (updatedMessages.length > 0) {
const fullMessages = (
await Promise.all(
updatedMessages.map(
async (msg) =>
await dsrToMessage(
dotYouClient,
msg as unknown as HomebaseFile<string>,
ChatDrive,
false
)
)
)
).filter(Boolean) as HomebaseFile<ChatMessage>[];
await processChatMessagesBatch(dotYouClient, queryClient, fullMessages);
}

return processedresult;
const updatedConversationsResult = await findChangesSinceTimestamp(
dotYouClient,
cursor,
{
targetDrive: ChatDrive,
fileType: [CHAT_CONVERSATION_FILE_TYPE],
}
);
const updatedConversations = updatedConversationsResult.searchResults;
isDebug && console.debug('[InboxProcessor] new conversations', updatedConversations.length);
await processConversationsBatch(dotYouClient, queryClient, updatedConversations);
isDebug && console.log('[ChatInboxProcessor] Saving cursor:', updatedConversationsResult.cursorState ?? null);
return updatedConversationsResult.cursorState ?? null;
};

// We refetch this one on mount as each mount the websocket would reconnect, and there might be a backlog of messages
return useQuery({
queryKey: ['process-chat-inbox'],
queryKey: ['cursor-chat-inbox'],
queryFn: fetchData,
enabled: connected,
staleTime: 1000,
staleTime: 365*24*60*60*1000,
});
};

const findChangesSinceTimestamp = async (
dotYouClient: DotYouClient,
timeStamp: number,
cursor: string | null,
params: FileQueryParams
) => {
// const modifiedCursor = getQueryModifiedCursorFromTime(timeStamp); // Friday, 31 May 2024 09:38:54.678
const batchCursor = getQueryBatchCursorFromTime(new Date().getTime(), timeStamp);

const newFiles = await queryBatch(dotYouClient, params, {
maxRecords: BATCH_SIZE,
cursorState: batchCursor,
cursorState: cursor ?? undefined,
includeMetadataHeader: true,
includeTransferHistory: true,
ordering: 'newestFirst',
sorting: 'anyChangeDate',
});

// const modifiedFiles = await queryModified(dotYouClient, params, {
// maxRecords: BATCH_SIZE,
// cursor: modifiedCursor + '',
// excludePreviewThumbnail: false,
// includeHeaderContent: true,
// includeTransferHistory: true,
// });

// return modifiedFiles.searchResults.concat(newFiles.searchResults);
return newFiles.searchResults;
return newFiles;
};

const processConversationsBatch = async (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
const isDebug = hasDebugFlag();

export const useChatWebsocket = (isEnabled: boolean) => {
const queryClient = useQueryClient();

Check warning on line 37 in packages/apps/chat-app/src/hooks/chat/live/useChatWebsocket.ts

View workflow job for this annotation

GitHub Actions / build

'queryClient' is assigned a value but never used
const { chatHandler } = useChatSocketHandler();

return useWebsocketSubscriber(
Expand All @@ -50,8 +50,12 @@
'appNotificationAdded',
],
websocketDrives,
() => queryClient.invalidateQueries({ queryKey: ['process-chat-inbox'] }),
() => queryClient.invalidateQueries({ queryKey: ['process-chat-inbox'] }),
// () => {
// console.log('[ChatWebsocket] onConnect triggered at', new Date().toISOString());
// queryClient.refetchQueries({ queryKey: ['cursor-chat-inbox'] });
// },
undefined,
undefined,
'useLiveChatProcessor'
);
};
Expand Down
Loading
Loading