Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add message history and retransmission #3199

Draft
wants to merge 28 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
957b32d
add message history and retransmission
afullerx Jun 10, 2024
ce74dca
remove console.log call
afullerx Jun 10, 2024
4754e47
move initial message ID to page render
afullerx Jun 11, 2024
61bd0ad
update docstring
afullerx Jun 11, 2024
0a5712b
add retransmit ID
afullerx Jun 13, 2024
9161d4e
minor refactor
afullerx Jun 13, 2024
7ac6136
code review
falkoschindler Jun 16, 2024
bd29dc4
Merge branch 'main' into message-retransmission
afullerx Jul 3, 2024
8606c5e
lower overhead
afullerx Jul 3, 2024
87bd9e5
add emit target filter
afullerx Jul 5, 2024
744ea93
fix on air compatibility
afullerx Jul 13, 2024
230adce
code review
falkoschindler Jul 27, 2024
dbb0f16
Merge branch 'main' into message-retransmission
falkoschindler Jul 27, 2024
0074677
fix order of history attributes
falkoschindler Jul 27, 2024
0deb85d
add missing "not" to log message
afullerx Jul 28, 2024
a3631b5
prevent incrementing _message_count for "sync" message
afullerx Jul 30, 2024
3dbbfd6
change config option to "message_history_length"
afullerx Jul 30, 2024
a8ace71
wrap message payload
afullerx Jul 30, 2024
33e890e
remove previous socket ID after sync
afullerx Jul 31, 2024
18e3d9d
Merge branch 'main' into message-retransmission
falkoschindler Oct 12, 2024
2163fe0
code review
falkoschindler Oct 12, 2024
2daa782
fix pytest fixture
falkoschindler Oct 12, 2024
8edf298
simplify retransmission by keeping sent messages in message queue
falkoschindler Oct 19, 2024
f0eb99c
Merge branch 'main' into message-retransmission
falkoschindler Oct 24, 2024
16098a8
consider maximum `message_history_length`
falkoschindler Oct 25, 2024
d3448c7
re-introduce a message_history queue to fix message ID hiccups
falkoschindler Oct 26, 2024
3408c38
cleanup and small corrections
falkoschindler Oct 26, 2024
69e8a52
don't reload on shared pages
falkoschindler Oct 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions nicegui/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(self, page: page, *, shared: bool = False) -> None:
self._has_warned_about_deleted_client = False
self.tab_id: Optional[str] = None

self.page = page
self.outbox = Outbox(self)

with Element('q-layout', _client=self).props('view="hhh lpr fff"').classes('nicegui-layout') as self.layout:
Expand All @@ -75,7 +76,6 @@ def __init__(self, page: page, *, shared: bool = False) -> None:
self._head_html = ''
self._body_html = ''

self.page = page
self.storage = ObservableDict()

self.connect_handlers: List[Union[Callable[..., Any], Awaitable]] = []
Expand Down Expand Up @@ -122,7 +122,11 @@ def build_response(self, request: Request, status_code: int = 200) -> Response:
elements = json.dumps({
id: element._to_dict() for id, element in self.elements.items() # pylint: disable=protected-access
})
socket_io_js_query_params = {**core.app.config.socket_io_js_query_params, 'client_id': self.id}
socket_io_js_query_params = {
**core.app.config.socket_io_js_query_params,
'client_id': self.id,
'starting_message_id': self.outbox.message_count,
}
vue_html, vue_styles, vue_scripts, imports, js_imports = generate_resources(prefix, self.elements.values())
return templates.TemplateResponse(
request=request,
Expand Down Expand Up @@ -252,11 +256,7 @@ def handle_handshake(self) -> None:
def handle_disconnect(self) -> None:
"""Wait for the browser to reconnect; invoke disconnect handlers if it doesn't."""
async def handle_disconnect() -> None:
if self.page.reconnect_timeout is not None:
delay = self.page.reconnect_timeout
else:
delay = core.app.config.reconnect_timeout # pylint: disable=protected-access
await asyncio.sleep(delay)
await asyncio.sleep(self.page.resolve_reconnect_timeout())
for t in self.disconnect_handlers:
self.safe_invoke(t)
for t in core.app._disconnect_handlers: # pylint: disable=protected-access
Expand Down
6 changes: 4 additions & 2 deletions nicegui/nicegui.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import urllib.parse
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Dict
from typing import Any, Dict

import socketio
from fastapi import HTTPException, Request
Expand Down Expand Up @@ -163,13 +163,15 @@ async def _exception_handler_500(request: Request, exception: Exception) -> Resp


@sio.on('handshake')
async def _on_handshake(sid: str, data: Dict[str, str]) -> bool:
async def _on_handshake(sid: str, data: Dict[str, Any]) -> bool:
client = Client.instances.get(data['client_id'])
if not client:
return False
client.environ = sio.get_environ(sid)
client.tab_id = data['tab_id']
await sio.enter_room(sid, client.id)
if not client.outbox.synchronize(data['last_message_id'], data['retransmit_id']):
return False
client.handle_handshake()
return True

Expand Down
47 changes: 47 additions & 0 deletions nicegui/outbox.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import time
from collections import deque
from typing import TYPE_CHECKING, Any, Deque, Dict, Optional, Tuple

Expand All @@ -24,11 +25,25 @@ def __init__(self, client: Client) -> None:
self.messages: Deque[Message] = deque()
self._should_stop = False
self._enqueue_event: Optional[asyncio.Event] = None
self._history: Deque[Tuple[int, float, Tuple[MessageType, Any, ClientId]]] = deque()
self._message_count: int = 0

if self.client.shared:
self._history_duration = 30
else:
dt = core.sio.eio.ping_interval + core.sio.eio.ping_timeout + self.client.page.resolve_reconnect_timeout()
self._history_duration = max(dt, 30)

if core.app.is_started:
background_tasks.create(self.loop(), name=f'outbox loop {client.id}')
else:
core.app.on_startup(self.loop)

@property
def message_count(self) -> int:
"""Total number of messages sent."""
return self._message_count

def _set_enqueue_event(self) -> None:
"""Set the enqueue event while accounting for lazy initialization."""
if self._enqueue_event:
Expand All @@ -52,6 +67,32 @@ def enqueue_message(self, message_type: MessageType, data: Any, target_id: Clien
self.messages.append((target_id, message_type, data))
self._set_enqueue_event()

def _append_history(self, message_type: MessageType, data: Any, target: ClientId) -> None:
self._message_count += 1
timestamp = time.time()
while self._history and self._history[0][1] < timestamp - self._history_duration:
self._history.popleft()
self._history.append((self._message_count, timestamp, (message_type, data, target)))

def synchronize(self, last_message_id: int, retransmit_id: str) -> bool:
"""Synchronize the state of a connecting client by resending missed messages, if possible."""
if self._history:
next_id = last_message_id + 1
oldest_id = self._history[0][0]
if oldest_id > next_id:
return False

start = next_id - oldest_id
for i in range(start, len(self._history)):
args = self._history[i][2]
args[1]['retransmit_id'] = retransmit_id
self.enqueue_message('retransmit', args, '')

elif last_message_id != self._message_count:
return False

return True

async def loop(self) -> None:
"""Send updates and messages to all clients in an endless loop."""
self._enqueue_event = asyncio.Event()
Expand Down Expand Up @@ -96,6 +137,12 @@ async def loop(self) -> None:
await asyncio.sleep(0.1)

async def _emit(self, message_type: MessageType, data: Any, target_id: ClientId) -> None:
if message_type != 'retransmit':
self._append_history(message_type, data, target_id)
data['message_id'] = self._message_count
else:
message_type, data, target_id = data

await core.sio.emit(message_type, data, room=target_id)
if core.air is not None and core.air.is_air_target(target_id):
await core.air.emit(message_type, data, room=target_id)
Expand Down
6 changes: 5 additions & 1 deletion nicegui/page.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self,
:param dark: whether to use Quasar's dark mode (defaults to `dark` argument of `run` command)
:param language: language of the page (defaults to `language` argument of `run` command)
:param response_timeout: maximum time for the decorated function to build the page (default: 3.0 seconds)
:param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 0.0 seconds)
:param reconnect_timeout: maximum time the server waits for the browser to reconnect (defaults to `reconnect_timeout` argument of `run` command))
:param api_router: APIRouter instance to use, can be left `None` to use the default
:param kwargs: additional keyword arguments passed to FastAPI's @app.get method
"""
Expand Down Expand Up @@ -89,6 +89,10 @@ def resolve_language(self) -> Optional[str]:
"""Return the language of the page."""
return self.language if self.language is not ... else core.app.config.language

def resolve_reconnect_timeout(self) -> float:
"""Return the reconnect_timeout of the page."""
return self.reconnect_timeout if self.reconnect_timeout is not None else core.app.config.reconnect_timeout

def __call__(self, func: Callable[..., Any]) -> Callable[..., Any]:
core.app.remove_route(self.path) # NOTE make sure only the latest route definition is used
parameters_of_decorated_func = list(inspect.signature(func).parameters.keys())
Expand Down
23 changes: 22 additions & 1 deletion nicegui/static/nicegui.js
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ function createApp(elements, options) {
window.clientId = options.query.client_id;
const url = window.location.protocol === "https:" ? "wss://" : "ws://" + window.location.host;
window.path_prefix = options.prefix;
window.last_message_id = options.query.starting_message_id;
window.socket = io(url, {
path: `${options.prefix}/_nicegui_ws/socket.io`,
query: options.query,
Expand All @@ -302,7 +303,14 @@ function createApp(elements, options) {
tabId = createRandomUUID();
sessionStorage.setItem("__nicegui_tab_id", tabId);
}
window.socket.emit("handshake", { client_id: window.clientId, tab_id: tabId }, (ok) => {
window.retransmitId = createRandomUUID();
const args = {
client_id: window.clientId,
tab_id: tabId,
last_message_id: window.last_message_id,
retransmit_id: window.retransmitId,
};
window.socket.emit("handshake", args, (ok) => {
if (!ok) {
console.log("reloading because handshake failed for clientId " + window.clientId);
window.location.reload();
Expand Down Expand Up @@ -351,6 +359,19 @@ function createApp(elements, options) {
let isProcessingSocketMessage = false;
for (const [event, handler] of Object.entries(messageHandlers)) {
window.socket.on(event, async (...args) => {
if (args.length > 0 && args[0].hasOwnProperty("message_id")) {
const data = args[0];
if (
data.message_id <= window.last_message_id ||
("retransmit_id" in data && data.retransmit_id != window.retransmitId)
) {
return;
}
window.last_message_id = data.message_id;
delete data.message_id;
delete data.retransmit_id;
}

socketMessageQueue.push(() => handler(...args));
if (!isProcessingSocketMessage) {
while (socketMessageQueue.length > 0) {
Expand Down