From 957b32d37dbbe764d426494d4c5092786ea0c9b4 Mon Sep 17 00:00:00 2001 From: aafuller Date: Mon, 10 Jun 2024 00:38:00 -0400 Subject: [PATCH 01/24] add message history and retransmission --- nicegui/client.py | 8 ++---- nicegui/nicegui.py | 6 +++-- nicegui/outbox.py | 51 +++++++++++++++++++++++++++++++++++++++ nicegui/page.py | 6 ++++- nicegui/static/nicegui.js | 28 ++++++++++++++++----- 5 files changed, 84 insertions(+), 15 deletions(-) diff --git a/nicegui/client.py b/nicegui/client.py index 4372a97b7..c8fa91a04 100644 --- a/nicegui/client.py +++ b/nicegui/client.py @@ -63,6 +63,7 @@ def __init__(self, page: page, *, shared: bool = False) -> None: self._has_warned_about_deleted_client = False self.tab_id: Optional[str] = None + self.page = page self.outbox = Outbox(self) with Element('q-layout', _client=self).props('view="hhh lpr fff"').classes('nicegui-layout') as self.layout: @@ -75,7 +76,6 @@ def __init__(self, page: page, *, shared: bool = False) -> None: self._head_html = '' self._body_html = '' - self.page = page self.storage = ObservableDict() self.connect_handlers: List[Union[Callable[..., Any], Awaitable]] = [] @@ -252,11 +252,7 @@ def handle_handshake(self) -> None: def handle_disconnect(self) -> None: """Wait for the browser to reconnect; invoke disconnect handlers if it doesn't.""" async def handle_disconnect() -> None: - if self.page.reconnect_timeout is not None: - delay = self.page.reconnect_timeout - else: - delay = core.app.config.reconnect_timeout # pylint: disable=protected-access - await asyncio.sleep(delay) + await asyncio.sleep(self.page.resolve_reconnect_timeout()) for t in self.disconnect_handlers: self.safe_invoke(t) for t in core.app._disconnect_handlers: # pylint: disable=protected-access diff --git a/nicegui/nicegui.py b/nicegui/nicegui.py index 45b562a65..2725356f0 100644 --- a/nicegui/nicegui.py +++ b/nicegui/nicegui.py @@ -3,7 +3,7 @@ import urllib.parse from contextlib import asynccontextmanager from pathlib import Path -from typing import Dict +from typing import Dict, Any import socketio from fastapi import HTTPException, Request @@ -163,13 +163,15 @@ async def _exception_handler_500(request: Request, exception: Exception) -> Resp @sio.on('handshake') -async def _on_handshake(sid: str, data: Dict[str, str]) -> bool: +async def _on_handshake(sid: str, data: Dict[str, Any]) -> bool: client = Client.instances.get(data['client_id']) if not client: return False client.environ = sio.get_environ(sid) client.tab_id = data['tab_id'] await sio.enter_room(sid, client.id) + if not client.outbox.synchronize(data['last_message_id']): + return False client.handle_handshake() return True diff --git a/nicegui/outbox.py b/nicegui/outbox.py index b65a759bf..7bee27d87 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import time from collections import deque from typing import TYPE_CHECKING, Any, Deque, Dict, Optional, Tuple @@ -24,6 +25,15 @@ def __init__(self, client: Client) -> None: self.messages: Deque[Message] = deque() self._should_stop = False self._enqueue_event: Optional[asyncio.Event] = None + self._history: Deque[Tuple[int, float, Tuple[MessageType, Any, ClientId]]] = deque() + self._message_count: int = 0 + + if self.client.shared: + self._history_duration = 30 + else: + self._history_duration = core.sio.eio.ping_interval + core.sio.eio.ping_timeout + \ + self.client.page.resolve_reconnect_timeout() + if core.app.is_started: background_tasks.create(self.loop(), name=f'outbox loop {client.id}') else: @@ -52,6 +62,42 @@ def enqueue_message(self, message_type: MessageType, data: Any, target_id: Clien self.messages.append((target_id, message_type, data)) self._set_enqueue_event() + def _append_history(self, message_type: MessageType, data: Any, target: ClientId) -> int: + self._message_count += 1 + current_ts = time.time() + + while len(self._history) > 0: + oldest_ts = self._history[0][1] + if current_ts - oldest_ts > self._history_duration: + self._history.popleft() + else: + break + + self._history.append((self._message_count, current_ts, (message_type, data, target))) + + return self._message_count + + def synchronize(self, last_msg_id: int) -> bool: + """Synchronize the state of a reconnecting client by resending missed messages, if possible.""" + if last_msg_id == 0: + self.enqueue_message('sync_message_id', {}, self.client.id) + return True + + if len(self._history) > 0: + next_id = last_msg_id + 1 + oldest_id = self._history[0][0] + if oldest_id > next_id: + return False + + start = next_id - oldest_id + for i in range(start, len(self._history)): + self.enqueue_message('retransmit', self._history[i][2], '') + else: + if last_msg_id != self._message_count: + return False + + return True + async def loop(self) -> None: """Send updates and messages to all clients in an endless loop.""" self._enqueue_event = asyncio.Event() @@ -96,6 +142,11 @@ async def loop(self) -> None: await asyncio.sleep(0.1) async def _emit(self, message_type: MessageType, data: Any, target_id: ClientId) -> None: + if message_type != 'retransmit': + data['message_id'] = self._append_history(message_type, data, target_id) + else: + message_type, data, target_id = data + await core.sio.emit(message_type, data, room=target_id) if core.air is not None and core.air.is_air_target(target_id): await core.air.emit(message_type, data, room=target_id) diff --git a/nicegui/page.py b/nicegui/page.py index b89b51a84..6b51795b5 100644 --- a/nicegui/page.py +++ b/nicegui/page.py @@ -51,7 +51,7 @@ def __init__(self, :param dark: whether to use Quasar's dark mode (defaults to `dark` argument of `run` command) :param language: language of the page (defaults to `language` argument of `run` command) :param response_timeout: maximum time for the decorated function to build the page (default: 3.0 seconds) - :param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 0.0 seconds) + :param reconnect_timeout: maximum time the server waits for the browser to reconnect (defaults to `reconnect_timeout` argument of `run` command)) :param api_router: APIRouter instance to use, can be left `None` to use the default :param kwargs: additional keyword arguments passed to FastAPI's @app.get method """ @@ -89,6 +89,10 @@ def resolve_language(self) -> Optional[str]: """Return the language of the page.""" return self.language if self.language is not ... else core.app.config.language + def resolve_reconnect_timeout(self) -> float: + """Return the reconnect_timeout of the page.""" + return self.reconnect_timeout if self.reconnect_timeout is not None else core.app.config.reconnect_timeout + def __call__(self, func: Callable[..., Any]) -> Callable[..., Any]: core.app.remove_route(self.path) # NOTE make sure only the latest route definition is used parameters_of_decorated_func = list(inspect.signature(func).parameters.keys()) diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js index e42754659..9197f4ab7 100644 --- a/nicegui/static/nicegui.js +++ b/nicegui/static/nicegui.js @@ -289,6 +289,7 @@ function createApp(elements, options) { window.clientId = options.query.client_id; const url = window.location.protocol === "https:" ? "wss://" : "ws://" + window.location.host; window.path_prefix = options.prefix; + window.last_message_id = 0; window.socket = io(url, { path: `${options.prefix}/_nicegui_ws/socket.io`, query: options.query, @@ -302,13 +303,17 @@ function createApp(elements, options) { tabId = createRandomUUID(); sessionStorage.setItem("__nicegui_tab_id", tabId); } - window.socket.emit("handshake", { client_id: window.clientId, tab_id: tabId }, (ok) => { - if (!ok) { - console.log("reloading because handshake failed for clientId " + window.clientId); - window.location.reload(); + window.socket.emit( + "handshake", + { client_id: window.clientId, tab_id: tabId, last_message_id: window.last_message_id }, + (ok) => { + if (!ok) { + console.log("reloading because handshake failed for clientId " + window.clientId); + window.location.reload(); + } + document.getElementById("popup").ariaHidden = true; } - document.getElementById("popup").ariaHidden = true; - }); + ); }, connect_error: (err) => { if (err.message == "timeout") { @@ -346,11 +351,22 @@ function createApp(elements, options) { }, download: (msg) => download(msg.src, msg.filename, msg.media_type, options.prefix), notify: (msg) => Quasar.Notify.create(msg), + sync_message_id: () => {}, }; const socketMessageQueue = []; let isProcessingSocketMessage = false; for (const [event, handler] of Object.entries(messageHandlers)) { window.socket.on(event, async (...args) => { + if (args.length > 0 && args[0].hasOwnProperty("message_id")) { + console.log(`ID: ${args[0].message_id}`); + if (args[0].message_id > window.last_message_id) { + window.last_message_id = args[0].message_id; + delete args[0].message_id; + } else { + return; + } + } + socketMessageQueue.push(() => handler(...args)); if (!isProcessingSocketMessage) { while (socketMessageQueue.length > 0) { From ce74dca2b1c4c602ddca44b9768e7f40003e4d0b Mon Sep 17 00:00:00 2001 From: aafuller Date: Mon, 10 Jun 2024 00:51:21 -0400 Subject: [PATCH 02/24] remove console.log call --- nicegui/static/nicegui.js | 1 - 1 file changed, 1 deletion(-) diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js index 9197f4ab7..33570de20 100644 --- a/nicegui/static/nicegui.js +++ b/nicegui/static/nicegui.js @@ -358,7 +358,6 @@ function createApp(elements, options) { for (const [event, handler] of Object.entries(messageHandlers)) { window.socket.on(event, async (...args) => { if (args.length > 0 && args[0].hasOwnProperty("message_id")) { - console.log(`ID: ${args[0].message_id}`); if (args[0].message_id > window.last_message_id) { window.last_message_id = args[0].message_id; delete args[0].message_id; From 4754e47c72a160095cfd0a2fb270da4ee4e372b8 Mon Sep 17 00:00:00 2001 From: aafuller Date: Mon, 10 Jun 2024 23:45:22 -0400 Subject: [PATCH 03/24] move initial message ID to page render --- nicegui/client.py | 3 ++- nicegui/outbox.py | 13 +++++++------ nicegui/static/nicegui.js | 3 +-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/nicegui/client.py b/nicegui/client.py index c8fa91a04..767031f89 100644 --- a/nicegui/client.py +++ b/nicegui/client.py @@ -122,7 +122,8 @@ def build_response(self, request: Request, status_code: int = 200) -> Response: elements = json.dumps({ id: element._to_dict() for id, element in self.elements.items() # pylint: disable=protected-access }) - socket_io_js_query_params = {**core.app.config.socket_io_js_query_params, 'client_id': self.id} + socket_io_js_query_params = {**core.app.config.socket_io_js_query_params, + 'client_id': self.id, 'starting_message_id': self.outbox.message_count} vue_html, vue_styles, vue_scripts, imports, js_imports = generate_resources(prefix, self.elements.values()) return templates.TemplateResponse( request=request, diff --git a/nicegui/outbox.py b/nicegui/outbox.py index 7bee27d87..61b2611b1 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -31,14 +31,19 @@ def __init__(self, client: Client) -> None: if self.client.shared: self._history_duration = 30 else: - self._history_duration = core.sio.eio.ping_interval + core.sio.eio.ping_timeout + \ - self.client.page.resolve_reconnect_timeout() + self._history_duration = max(core.sio.eio.ping_interval + core.sio.eio.ping_timeout + + self.client.page.resolve_reconnect_timeout(), 30) if core.app.is_started: background_tasks.create(self.loop(), name=f'outbox loop {client.id}') else: core.app.on_startup(self.loop) + @property + def message_count(self): + """Get the total number of messages sent.""" + return self._message_count + def _set_enqueue_event(self) -> None: """Set the enqueue event while accounting for lazy initialization.""" if self._enqueue_event: @@ -79,10 +84,6 @@ def _append_history(self, message_type: MessageType, data: Any, target: ClientId def synchronize(self, last_msg_id: int) -> bool: """Synchronize the state of a reconnecting client by resending missed messages, if possible.""" - if last_msg_id == 0: - self.enqueue_message('sync_message_id', {}, self.client.id) - return True - if len(self._history) > 0: next_id = last_msg_id + 1 oldest_id = self._history[0][0] diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js index 33570de20..7bcca9d6e 100644 --- a/nicegui/static/nicegui.js +++ b/nicegui/static/nicegui.js @@ -289,7 +289,7 @@ function createApp(elements, options) { window.clientId = options.query.client_id; const url = window.location.protocol === "https:" ? "wss://" : "ws://" + window.location.host; window.path_prefix = options.prefix; - window.last_message_id = 0; + window.last_message_id = options.query.starting_message_id; window.socket = io(url, { path: `${options.prefix}/_nicegui_ws/socket.io`, query: options.query, @@ -351,7 +351,6 @@ function createApp(elements, options) { }, download: (msg) => download(msg.src, msg.filename, msg.media_type, options.prefix), notify: (msg) => Quasar.Notify.create(msg), - sync_message_id: () => {}, }; const socketMessageQueue = []; let isProcessingSocketMessage = false; From 61bd0ad93876c516f607d897cffbd9ff60cd5fcd Mon Sep 17 00:00:00 2001 From: aafuller Date: Tue, 11 Jun 2024 00:07:27 -0400 Subject: [PATCH 04/24] update docstring --- nicegui/outbox.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nicegui/outbox.py b/nicegui/outbox.py index 61b2611b1..d71d14f43 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -40,7 +40,7 @@ def __init__(self, client: Client) -> None: core.app.on_startup(self.loop) @property - def message_count(self): + def message_count(self) -> int: """Get the total number of messages sent.""" return self._message_count @@ -83,7 +83,7 @@ def _append_history(self, message_type: MessageType, data: Any, target: ClientId return self._message_count def synchronize(self, last_msg_id: int) -> bool: - """Synchronize the state of a reconnecting client by resending missed messages, if possible.""" + """Synchronize the state of a connecting client by resending missed messages, if possible.""" if len(self._history) > 0: next_id = last_msg_id + 1 oldest_id = self._history[0][0] From 0a5712bd8208d64905cadd11c7caa2b2a097ead4 Mon Sep 17 00:00:00 2001 From: aafuller Date: Thu, 13 Jun 2024 00:23:47 -0400 Subject: [PATCH 05/24] add retransmit ID --- nicegui/nicegui.py | 4 ++-- nicegui/outbox.py | 12 +++++++----- nicegui/static/nicegui.js | 20 ++++++++++++++++---- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/nicegui/nicegui.py b/nicegui/nicegui.py index 2725356f0..41f0471d9 100644 --- a/nicegui/nicegui.py +++ b/nicegui/nicegui.py @@ -3,7 +3,7 @@ import urllib.parse from contextlib import asynccontextmanager from pathlib import Path -from typing import Dict, Any +from typing import Any, Dict import socketio from fastapi import HTTPException, Request @@ -170,7 +170,7 @@ async def _on_handshake(sid: str, data: Dict[str, Any]) -> bool: client.environ = sio.get_environ(sid) client.tab_id = data['tab_id'] await sio.enter_room(sid, client.id) - if not client.outbox.synchronize(data['last_message_id']): + if not client.outbox.synchronize(data['last_message_id'], data['retransmit_id']): return False client.handle_handshake() return True diff --git a/nicegui/outbox.py b/nicegui/outbox.py index d71d14f43..e97e8d9b2 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -82,7 +82,7 @@ def _append_history(self, message_type: MessageType, data: Any, target: ClientId return self._message_count - def synchronize(self, last_msg_id: int) -> bool: + def synchronize(self, last_msg_id: int, retransmit_id: str) -> bool: """Synchronize the state of a connecting client by resending missed messages, if possible.""" if len(self._history) > 0: next_id = last_msg_id + 1 @@ -92,10 +92,12 @@ def synchronize(self, last_msg_id: int) -> bool: start = next_id - oldest_id for i in range(start, len(self._history)): - self.enqueue_message('retransmit', self._history[i][2], '') - else: - if last_msg_id != self._message_count: - return False + args = self._history[i][2] + args[1]['retransmit_id'] = retransmit_id + self.enqueue_message('retransmit', args, '') + + elif last_msg_id != self._message_count: + return False return True diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js index 7bcca9d6e..d41032be0 100644 --- a/nicegui/static/nicegui.js +++ b/nicegui/static/nicegui.js @@ -303,9 +303,15 @@ function createApp(elements, options) { tabId = createRandomUUID(); sessionStorage.setItem("__nicegui_tab_id", tabId); } + window.retransmitId = createRandomUUID(); window.socket.emit( "handshake", - { client_id: window.clientId, tab_id: tabId, last_message_id: window.last_message_id }, + { + client_id: window.clientId, + tab_id: tabId, + last_message_id: window.last_message_id, + retransmit_id: window.retransmitId, + }, (ok) => { if (!ok) { console.log("reloading because handshake failed for clientId " + window.clientId); @@ -357,11 +363,17 @@ function createApp(elements, options) { for (const [event, handler] of Object.entries(messageHandlers)) { window.socket.on(event, async (...args) => { if (args.length > 0 && args[0].hasOwnProperty("message_id")) { - if (args[0].message_id > window.last_message_id) { - window.last_message_id = args[0].message_id; - delete args[0].message_id; + const data = args[0]; + if ("retransmit_id" in data && data.retransmit_id != window.retransmitId) { + return; } else { + delete data.retransmit_id; + } + if (data.message_id <= window.last_message_id) { return; + } else { + window.last_message_id = data.message_id; + delete data.message_id; } } From 9161d4e8238e44e3f3b12897de89e391ed2385b5 Mon Sep 17 00:00:00 2001 From: aafuller Date: Thu, 13 Jun 2024 16:29:13 -0400 Subject: [PATCH 06/24] minor refactor --- nicegui/static/nicegui.js | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js index d41032be0..6713b5ed9 100644 --- a/nicegui/static/nicegui.js +++ b/nicegui/static/nicegui.js @@ -364,17 +364,15 @@ function createApp(elements, options) { window.socket.on(event, async (...args) => { if (args.length > 0 && args[0].hasOwnProperty("message_id")) { const data = args[0]; - if ("retransmit_id" in data && data.retransmit_id != window.retransmitId) { + if ( + data.message_id <= window.last_message_id || + ("retransmit_id" in data && data.retransmit_id != window.retransmitId) + ) { return; - } else { - delete data.retransmit_id; - } - if (data.message_id <= window.last_message_id) { - return; - } else { - window.last_message_id = data.message_id; - delete data.message_id; } + window.last_message_id = data.message_id; + delete data.message_id; + delete data.retransmit_id; } socketMessageQueue.push(() => handler(...args)); From 7ac6136bd8af725c8e7d6ab709f312d44eb01d6c Mon Sep 17 00:00:00 2001 From: Falko Schindler Date: Sun, 16 Jun 2024 16:48:07 +0200 Subject: [PATCH 07/24] code review --- nicegui/client.py | 7 +++++-- nicegui/outbox.py | 35 ++++++++++++++--------------------- nicegui/static/nicegui.js | 27 ++++++++++++--------------- 3 files changed, 31 insertions(+), 38 deletions(-) diff --git a/nicegui/client.py b/nicegui/client.py index 767031f89..7aa1c0a96 100644 --- a/nicegui/client.py +++ b/nicegui/client.py @@ -122,8 +122,11 @@ def build_response(self, request: Request, status_code: int = 200) -> Response: elements = json.dumps({ id: element._to_dict() for id, element in self.elements.items() # pylint: disable=protected-access }) - socket_io_js_query_params = {**core.app.config.socket_io_js_query_params, - 'client_id': self.id, 'starting_message_id': self.outbox.message_count} + socket_io_js_query_params = { + **core.app.config.socket_io_js_query_params, + 'client_id': self.id, + 'starting_message_id': self.outbox.message_count, + } vue_html, vue_styles, vue_scripts, imports, js_imports = generate_resources(prefix, self.elements.values()) return templates.TemplateResponse( request=request, diff --git a/nicegui/outbox.py b/nicegui/outbox.py index e97e8d9b2..c9f1252cc 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -31,8 +31,8 @@ def __init__(self, client: Client) -> None: if self.client.shared: self._history_duration = 30 else: - self._history_duration = max(core.sio.eio.ping_interval + core.sio.eio.ping_timeout + - self.client.page.resolve_reconnect_timeout(), 30) + dt = core.sio.eio.ping_interval + core.sio.eio.ping_timeout + self.client.page.resolve_reconnect_timeout() + self._history_duration = max(dt, 30) if core.app.is_started: background_tasks.create(self.loop(), name=f'outbox loop {client.id}') @@ -41,7 +41,7 @@ def __init__(self, client: Client) -> None: @property def message_count(self) -> int: - """Get the total number of messages sent.""" + """Total number of messages sent.""" return self._message_count def _set_enqueue_event(self) -> None: @@ -67,25 +67,17 @@ def enqueue_message(self, message_type: MessageType, data: Any, target_id: Clien self.messages.append((target_id, message_type, data)) self._set_enqueue_event() - def _append_history(self, message_type: MessageType, data: Any, target: ClientId) -> int: + def _append_history(self, message_type: MessageType, data: Any, target: ClientId) -> None: self._message_count += 1 - current_ts = time.time() + timestamp = time.time() + while self._history and self._history[0][1] < timestamp - self._history_duration: + self._history.popleft() + self._history.append((self._message_count, timestamp, (message_type, data, target))) - while len(self._history) > 0: - oldest_ts = self._history[0][1] - if current_ts - oldest_ts > self._history_duration: - self._history.popleft() - else: - break - - self._history.append((self._message_count, current_ts, (message_type, data, target))) - - return self._message_count - - def synchronize(self, last_msg_id: int, retransmit_id: str) -> bool: + def synchronize(self, last_message_id: int, retransmit_id: str) -> bool: """Synchronize the state of a connecting client by resending missed messages, if possible.""" - if len(self._history) > 0: - next_id = last_msg_id + 1 + if self._history: + next_id = last_message_id + 1 oldest_id = self._history[0][0] if oldest_id > next_id: return False @@ -96,7 +88,7 @@ def synchronize(self, last_msg_id: int, retransmit_id: str) -> bool: args[1]['retransmit_id'] = retransmit_id self.enqueue_message('retransmit', args, '') - elif last_msg_id != self._message_count: + elif last_message_id != self._message_count: return False return True @@ -146,7 +138,8 @@ async def loop(self) -> None: async def _emit(self, message_type: MessageType, data: Any, target_id: ClientId) -> None: if message_type != 'retransmit': - data['message_id'] = self._append_history(message_type, data, target_id) + self._append_history(message_type, data, target_id) + data['message_id'] = self._message_count else: message_type, data, target_id = data diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js index 6713b5ed9..c8618eb10 100644 --- a/nicegui/static/nicegui.js +++ b/nicegui/static/nicegui.js @@ -304,22 +304,19 @@ function createApp(elements, options) { sessionStorage.setItem("__nicegui_tab_id", tabId); } window.retransmitId = createRandomUUID(); - window.socket.emit( - "handshake", - { - client_id: window.clientId, - tab_id: tabId, - last_message_id: window.last_message_id, - retransmit_id: window.retransmitId, - }, - (ok) => { - if (!ok) { - console.log("reloading because handshake failed for clientId " + window.clientId); - window.location.reload(); - } - document.getElementById("popup").ariaHidden = true; + const args = { + client_id: window.clientId, + tab_id: tabId, + last_message_id: window.last_message_id, + retransmit_id: window.retransmitId, + }; + window.socket.emit("handshake", args, (ok) => { + if (!ok) { + console.log("reloading because handshake failed for clientId " + window.clientId); + window.location.reload(); } - ); + document.getElementById("popup").ariaHidden = true; + }); }, connect_error: (err) => { if (err.message == "timeout") { From 8606c5e87c152ab77f5577f2860527a8333fe2cb Mon Sep 17 00:00:00 2001 From: aafuller Date: Wed, 3 Jul 2024 14:39:44 -0400 Subject: [PATCH 08/24] lower overhead --- nicegui/app/app_config.py | 3 +++ nicegui/nicegui.py | 10 ++++----- nicegui/outbox.py | 44 ++++++++++++++++++++------------------- nicegui/static/nicegui.js | 42 ++++++++++++++++++++++++------------- nicegui/ui_run.py | 3 +++ nicegui/ui_run_with.py | 3 +++ 6 files changed, 64 insertions(+), 41 deletions(-) diff --git a/nicegui/app/app_config.py b/nicegui/app/app_config.py index c36e8c134..f43052650 100644 --- a/nicegui/app/app_config.py +++ b/nicegui/app/app_config.py @@ -32,6 +32,7 @@ class AppConfig: language: Language = field(init=False) binding_refresh_interval: float = field(init=False) reconnect_timeout: float = field(init=False) + message_buffer_max: int = field(init=False) tailwind: bool = field(init=False) prod_js: bool = field(init=False) show_welcome_message: bool = field(init=False) @@ -47,6 +48,7 @@ def add_run_config(self, language: Language, binding_refresh_interval: float, reconnect_timeout: float, + message_buffer_max: int, tailwind: bool, prod_js: bool, show_welcome_message: bool, @@ -60,6 +62,7 @@ def add_run_config(self, self.language = language self.binding_refresh_interval = binding_refresh_interval self.reconnect_timeout = reconnect_timeout + self.message_buffer_max = message_buffer_max self.tailwind = tailwind self.prod_js = prod_js self.show_welcome_message = show_welcome_message diff --git a/nicegui/nicegui.py b/nicegui/nicegui.py index 03d088846..25c23a370 100644 --- a/nicegui/nicegui.py +++ b/nicegui/nicegui.py @@ -163,17 +163,17 @@ async def _exception_handler_500(request: Request, exception: Exception) -> Resp @sio.on('handshake') -async def _on_handshake(sid: str, data: Dict[str, Any]) -> bool: +async def _on_handshake(sid: str, data: Dict[str, Any]) -> Dict[str, Any]: client = Client.instances.get(data['client_id']) if not client: - return False + return {'success': False, 'reason': 'no_client_id'} client.environ = sio.get_environ(sid) client.tab_id = data['tab_id'] await sio.enter_room(sid, client.id) - if not client.outbox.synchronize(data['last_message_id'], data['retransmit_id']): - return False + if not await client.outbox.synchronize(data['last_message_id'], data['sync_id']): + return {'success': False, 'reason': 'sync_failure'} client.handle_handshake() - return True + return {'success': True} @sio.on('disconnect') diff --git a/nicegui/outbox.py b/nicegui/outbox.py index c9f1252cc..028baa54f 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -25,14 +25,14 @@ def __init__(self, client: Client) -> None: self.messages: Deque[Message] = deque() self._should_stop = False self._enqueue_event: Optional[asyncio.Event] = None - self._history: Deque[Tuple[int, float, Tuple[MessageType, Any, ClientId]]] = deque() + self._history: Optional[Deque[Tuple[int, float, Tuple[MessageType, Any, ClientId]]]] = None self._message_count: int = 0 if self.client.shared: self._history_duration = 30 else: - dt = core.sio.eio.ping_interval + core.sio.eio.ping_timeout + self.client.page.resolve_reconnect_timeout() - self._history_duration = max(dt, 30) + connection_timeout = core.sio.eio.ping_interval + core.sio.eio.ping_timeout + self._history_duration = connection_timeout + self.client.page.resolve_reconnect_timeout() if core.app.is_started: background_tasks.create(self.loop(), name=f'outbox loop {client.id}') @@ -70,31 +70,35 @@ def enqueue_message(self, message_type: MessageType, data: Any, target_id: Clien def _append_history(self, message_type: MessageType, data: Any, target: ClientId) -> None: self._message_count += 1 timestamp = time.time() + self._history.append((self._message_count, timestamp, (message_type, data, target))) # type: ignore[union-attr] while self._history and self._history[0][1] < timestamp - self._history_duration: self._history.popleft() - self._history.append((self._message_count, timestamp, (message_type, data, target))) - def synchronize(self, last_message_id: int, retransmit_id: str) -> bool: + async def synchronize(self, last_message_id: int, sync_id: str) -> bool: """Synchronize the state of a connecting client by resending missed messages, if possible.""" - if self._history: - next_id = last_message_id + 1 - oldest_id = self._history[0][0] - if oldest_id > next_id: + messages = [] + if self._history is not None: + if self._history: + next_id = last_message_id + 1 + oldest_id = self._history[0][0] + if oldest_id > next_id: + return False + + start = next_id - oldest_id + for i in range(start, len(self._history)): + messages.append(self._history[i][2]) + + elif last_message_id != self._message_count: return False - start = next_id - oldest_id - for i in range(start, len(self._history)): - args = self._history[i][2] - args[1]['retransmit_id'] = retransmit_id - self.enqueue_message('retransmit', args, '') - - elif last_message_id != self._message_count: - return False - + await self._emit('sync', {'sync_id': sync_id, 'history': messages}, self.client.id) return True async def loop(self) -> None: """Send updates and messages to all clients in an endless loop.""" + if core.app.config.message_buffer_max: + self._history = Deque(maxlen=core.app.config.message_buffer_max) + self._enqueue_event = asyncio.Event() self._enqueue_event.set() @@ -137,11 +141,9 @@ async def loop(self) -> None: await asyncio.sleep(0.1) async def _emit(self, message_type: MessageType, data: Any, target_id: ClientId) -> None: - if message_type != 'retransmit': + if self._history is not None and message_type != 'sync': self._append_history(message_type, data, target_id) data['message_id'] = self._message_count - else: - message_type, data, target_id = data await core.sio.emit(message_type, data, room=target_id) if core.air is not None and core.air.is_air_target(target_id): diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js index c8618eb10..b349d68a7 100644 --- a/nicegui/static/nicegui.js +++ b/nicegui/static/nicegui.js @@ -289,7 +289,8 @@ function createApp(elements, options) { window.clientId = options.query.client_id; const url = window.location.protocol === "https:" ? "wss://" : "ws://" + window.location.host; window.path_prefix = options.prefix; - window.last_message_id = options.query.starting_message_id; + window.lastMessageId = options.query.starting_message_id; + window.syncing = true; window.socket = io(url, { path: `${options.prefix}/_nicegui_ws/socket.io`, query: options.query, @@ -303,17 +304,19 @@ function createApp(elements, options) { tabId = createRandomUUID(); sessionStorage.setItem("__nicegui_tab_id", tabId); } - window.retransmitId = createRandomUUID(); const args = { client_id: window.clientId, tab_id: tabId, - last_message_id: window.last_message_id, - retransmit_id: window.retransmitId, + last_message_id: window.lastMessageId, + sync_id: window.socket.id, }; - window.socket.emit("handshake", args, (ok) => { - if (!ok) { + window.socket.emit("handshake", args, (res) => { + if (!res.success && res.reason === "no_client_id") { console.log("reloading because handshake failed for clientId " + window.clientId); window.location.reload(); + } else if (!res.success && res.reason === "sync_failure") { + console.log("reloading because client could not be synchronized with the server"); + window.location.reload(); } document.getElementById("popup").ariaHidden = true; }); @@ -332,6 +335,7 @@ function createApp(elements, options) { }, disconnect: () => { document.getElementById("popup").ariaHidden = false; + window.syncing = true; }, update: async (msg) => { for (const [id, element] of Object.entries(msg)) { @@ -354,24 +358,32 @@ function createApp(elements, options) { }, download: (msg) => download(msg.src, msg.filename, msg.media_type, options.prefix), notify: (msg) => Quasar.Notify.create(msg), + sync: (msg) => { + if (msg.sync_id === window.socket.id) { + window.syncing = false; + for (let i = 0; i < msg.history.length; i++) { + let message = msg.history[i][1]; + if (message.message_id > window.lastMessageId) { + window.lastMessageId = message.message_id; + delete message.message_id; + messageHandlers[msg.history[i][0]](message); + } + } + } + }, }; const socketMessageQueue = []; let isProcessingSocketMessage = false; for (const [event, handler] of Object.entries(messageHandlers)) { window.socket.on(event, async (...args) => { - if (args.length > 0 && args[0].hasOwnProperty("message_id")) { - const data = args[0]; - if ( - data.message_id <= window.last_message_id || - ("retransmit_id" in data && data.retransmit_id != window.retransmitId) - ) { + const data = args[0]; + if (data && data.hasOwnProperty("message_id")) { + if (window.syncing || data.message_id <= window.lastMessageId) { return; } - window.last_message_id = data.message_id; + window.lastMessageId = data.message_id; delete data.message_id; - delete data.retransmit_id; } - socketMessageQueue.push(() => handler(...args)); if (!isProcessingSocketMessage) { while (socketMessageQueue.length > 0) { diff --git a/nicegui/ui_run.py b/nicegui/ui_run.py index 58a175a25..c8ed3db1c 100644 --- a/nicegui/ui_run.py +++ b/nicegui/ui_run.py @@ -31,6 +31,7 @@ def run(*, language: Language = 'en-US', binding_refresh_interval: float = 0.1, reconnect_timeout: float = 3.0, + message_buffer_max: int = 1000, show: bool = True, on_air: Optional[Union[str, Literal[True]]] = None, native: bool = False, @@ -63,6 +64,7 @@ def run(*, :param language: language for Quasar elements (default: `'en-US'`) :param binding_refresh_interval: time between binding updates (default: `0.1` seconds, bigger is more CPU friendly) :param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 3.0 seconds) + :param message_buffer_max: maximum number of messages that will be stored and resent after a connection interruption (default: 1000) :param show: automatically open the UI in a browser tab (default: `True`) :param on_air: tech preview: `allows temporary remote access `_ if set to `True` (default: disabled) :param native: open the UI in a native window of size 800x600 (default: `False`, deactivates `show`, automatically finds an open port) @@ -90,6 +92,7 @@ def run(*, language=language, binding_refresh_interval=binding_refresh_interval, reconnect_timeout=reconnect_timeout, + message_buffer_max=message_buffer_max, tailwind=tailwind, prod_js=prod_js, show_welcome_message=show_welcome_message, diff --git a/nicegui/ui_run_with.py b/nicegui/ui_run_with.py index 8d69e2bba..02756c6c1 100644 --- a/nicegui/ui_run_with.py +++ b/nicegui/ui_run_with.py @@ -19,6 +19,7 @@ def run_with( language: Language = 'en-US', binding_refresh_interval: float = 0.1, reconnect_timeout: float = 3.0, + message_buffer_max: int = 1000, mount_path: str = '/', on_air: Optional[Union[str, Literal[True]]] = None, tailwind: bool = True, @@ -36,6 +37,7 @@ def run_with( :param language: language for Quasar elements (default: `'en-US'`) :param binding_refresh_interval: time between binding updates (default: `0.1` seconds, bigger is more CPU friendly) :param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 3.0 seconds) + :param message_buffer_max: maximum number of messages that will be stored and resent after a connection interruption (default: 1000) :param mount_path: mount NiceGUI at this path (default: `'/'`) :param on_air: tech preview: `allows temporary remote access `_ if set to `True` (default: disabled) :param tailwind: whether to use Tailwind CSS (experimental, default: `True`) @@ -52,6 +54,7 @@ def run_with( language=language, binding_refresh_interval=binding_refresh_interval, reconnect_timeout=reconnect_timeout, + message_buffer_max=message_buffer_max, tailwind=tailwind, prod_js=prod_js, show_welcome_message=show_welcome_message, From 87bd9e5a9774bac60296995e0c7922eb1044f722 Mon Sep 17 00:00:00 2001 From: aafuller Date: Fri, 5 Jul 2024 19:37:35 -0400 Subject: [PATCH 09/24] add emit target filter --- nicegui/nicegui.py | 2 +- nicegui/outbox.py | 10 ++++++---- nicegui/static/nicegui.js | 6 ++++-- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/nicegui/nicegui.py b/nicegui/nicegui.py index 25c23a370..0b788514f 100644 --- a/nicegui/nicegui.py +++ b/nicegui/nicegui.py @@ -170,7 +170,7 @@ async def _on_handshake(sid: str, data: Dict[str, Any]) -> Dict[str, Any]: client.environ = sio.get_environ(sid) client.tab_id = data['tab_id'] await sio.enter_room(sid, client.id) - if not await client.outbox.synchronize(data['last_message_id'], data['sync_id']): + if not await client.outbox.synchronize(data['last_message_id'], data['socket_ids']): return {'success': False, 'reason': 'sync_failure'} client.handle_handshake() return {'success': True} diff --git a/nicegui/outbox.py b/nicegui/outbox.py index 028baa54f..df63931dc 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -3,7 +3,7 @@ import asyncio import time from collections import deque -from typing import TYPE_CHECKING, Any, Deque, Dict, Optional, Tuple +from typing import TYPE_CHECKING, Any, Deque, Dict, List, Optional, Tuple from . import background_tasks, core @@ -74,7 +74,7 @@ def _append_history(self, message_type: MessageType, data: Any, target: ClientId while self._history and self._history[0][1] < timestamp - self._history_duration: self._history.popleft() - async def synchronize(self, last_message_id: int, sync_id: str) -> bool: + async def synchronize(self, last_message_id: int, socket_ids: List[str]) -> bool: """Synchronize the state of a connecting client by resending missed messages, if possible.""" messages = [] if self._history is not None: @@ -86,12 +86,14 @@ async def synchronize(self, last_message_id: int, sync_id: str) -> bool: start = next_id - oldest_id for i in range(start, len(self._history)): - messages.append(self._history[i][2]) + msg = self._history[i][2] + if msg[2] == self.client.id or msg[2] in socket_ids: + messages.append(msg) elif last_message_id != self._message_count: return False - await self._emit('sync', {'sync_id': sync_id, 'history': messages}, self.client.id) + await self._emit('sync', {'target': socket_ids[-1], 'history': messages}, self.client.id) return True async def loop(self) -> None: diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js index b349d68a7..fdcdb7a84 100644 --- a/nicegui/static/nicegui.js +++ b/nicegui/static/nicegui.js @@ -291,6 +291,7 @@ function createApp(elements, options) { window.path_prefix = options.prefix; window.lastMessageId = options.query.starting_message_id; window.syncing = true; + window.socketIds = []; window.socket = io(url, { path: `${options.prefix}/_nicegui_ws/socket.io`, query: options.query, @@ -304,11 +305,12 @@ function createApp(elements, options) { tabId = createRandomUUID(); sessionStorage.setItem("__nicegui_tab_id", tabId); } + window.socketIds.push(window.socket.id); const args = { client_id: window.clientId, tab_id: tabId, last_message_id: window.lastMessageId, - sync_id: window.socket.id, + socket_ids: window.socketIds, }; window.socket.emit("handshake", args, (res) => { if (!res.success && res.reason === "no_client_id") { @@ -359,7 +361,7 @@ function createApp(elements, options) { download: (msg) => download(msg.src, msg.filename, msg.media_type, options.prefix), notify: (msg) => Quasar.Notify.create(msg), sync: (msg) => { - if (msg.sync_id === window.socket.id) { + if (msg.target === window.socket.id) { window.syncing = false; for (let i = 0; i < msg.history.length; i++) { let message = msg.history[i][1]; From 744ea93f6a9c9133fe669d3edf6dde607a5d3c18 Mon Sep 17 00:00:00 2001 From: aafuller Date: Fri, 12 Jul 2024 21:20:23 -0400 Subject: [PATCH 10/24] fix on air compatibility --- nicegui/air.py | 3 ++- nicegui/nicegui.py | 9 ++++----- nicegui/outbox.py | 24 ++++++++++++------------ nicegui/static/nicegui.js | 26 ++++++++++++++------------ nicegui/ui_run.py | 2 +- nicegui/ui_run_with.py | 2 +- 6 files changed, 34 insertions(+), 32 deletions(-) diff --git a/nicegui/air.py b/nicegui/air.py index 8f7a5ec30..9a658f846 100644 --- a/nicegui/air.py +++ b/nicegui/air.py @@ -116,7 +116,7 @@ def _handleerror(data: Dict[str, Any]) -> None: print('Error:', data['message'], flush=True) @self.relay.on('handshake') - def _handle_handshake(data: Dict[str, Any]) -> bool: + async def _handle_handshake(data: Dict[str, Any]) -> bool: client_id = data['client_id'] if client_id not in Client.instances: return False @@ -124,6 +124,7 @@ def _handle_handshake(data: Dict[str, Any]) -> bool: client.environ = data['environ'] client.tab_id = data['tab_id'] client.on_air = True + await client.outbox.synchronize(data['last_message_id'], data['socket_ids']) client.handle_handshake() return True diff --git a/nicegui/nicegui.py b/nicegui/nicegui.py index 0b788514f..d3ab63ca2 100644 --- a/nicegui/nicegui.py +++ b/nicegui/nicegui.py @@ -163,17 +163,16 @@ async def _exception_handler_500(request: Request, exception: Exception) -> Resp @sio.on('handshake') -async def _on_handshake(sid: str, data: Dict[str, Any]) -> Dict[str, Any]: +async def _on_handshake(sid: str, data: Dict[str, Any]) -> bool: client = Client.instances.get(data['client_id']) if not client: - return {'success': False, 'reason': 'no_client_id'} + return False client.environ = sio.get_environ(sid) client.tab_id = data['tab_id'] await sio.enter_room(sid, client.id) - if not await client.outbox.synchronize(data['last_message_id'], data['socket_ids']): - return {'success': False, 'reason': 'sync_failure'} + await client.outbox.synchronize(data['last_message_id'], data['socket_ids']) client.handle_handshake() - return {'success': True} + return True @sio.on('disconnect') diff --git a/nicegui/outbox.py b/nicegui/outbox.py index df63931dc..e0adc4018 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -74,27 +74,27 @@ def _append_history(self, message_type: MessageType, data: Any, target: ClientId while self._history and self._history[0][1] < timestamp - self._history_duration: self._history.popleft() - async def synchronize(self, last_message_id: int, socket_ids: List[str]) -> bool: + async def synchronize(self, last_message_id: int, socket_ids: List[str]) -> None: """Synchronize the state of a connecting client by resending missed messages, if possible.""" messages = [] + success = True if self._history is not None: if self._history: next_id = last_message_id + 1 oldest_id = self._history[0][0] - if oldest_id > next_id: - return False - - start = next_id - oldest_id - for i in range(start, len(self._history)): - msg = self._history[i][2] - if msg[2] == self.client.id or msg[2] in socket_ids: - messages.append(msg) + if oldest_id <= next_id: + start = next_id - oldest_id + for i in range(start, len(self._history)): + msg = self._history[i][2] + if msg[2] == self.client.id or msg[2] in socket_ids: + messages.append(msg) + else: + success = False elif last_message_id != self._message_count: - return False + success = False - await self._emit('sync', {'target': socket_ids[-1], 'history': messages}, self.client.id) - return True + await self._emit('sync', {'success': success, 'target': socket_ids[-1], 'history': messages}, self.client.id) async def loop(self) -> None: """Send updates and messages to all clients in an endless loop.""" diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js index fdcdb7a84..96f571057 100644 --- a/nicegui/static/nicegui.js +++ b/nicegui/static/nicegui.js @@ -312,13 +312,10 @@ function createApp(elements, options) { last_message_id: window.lastMessageId, socket_ids: window.socketIds, }; - window.socket.emit("handshake", args, (res) => { - if (!res.success && res.reason === "no_client_id") { + window.socket.emit("handshake", args, (ok) => { + if (!ok) { console.log("reloading because handshake failed for clientId " + window.clientId); window.location.reload(); - } else if (!res.success && res.reason === "sync_failure") { - console.log("reloading because client could not be synchronized with the server"); - window.location.reload(); } document.getElementById("popup").ariaHidden = true; }); @@ -362,14 +359,19 @@ function createApp(elements, options) { notify: (msg) => Quasar.Notify.create(msg), sync: (msg) => { if (msg.target === window.socket.id) { - window.syncing = false; - for (let i = 0; i < msg.history.length; i++) { - let message = msg.history[i][1]; - if (message.message_id > window.lastMessageId) { - window.lastMessageId = message.message_id; - delete message.message_id; - messageHandlers[msg.history[i][0]](message); + if (msg.success) { + window.syncing = false; + for (let i = 0; i < msg.history.length; i++) { + let message = msg.history[i][1]; + if (message.message_id > window.lastMessageId) { + window.lastMessageId = message.message_id; + delete message.message_id; + messageHandlers[msg.history[i][0]](message); + } } + } else { + console.log("reloading because client could not be synchronized with the server"); + window.location.reload(); } } }, diff --git a/nicegui/ui_run.py b/nicegui/ui_run.py index c8ed3db1c..f48999b9c 100644 --- a/nicegui/ui_run.py +++ b/nicegui/ui_run.py @@ -64,7 +64,7 @@ def run(*, :param language: language for Quasar elements (default: `'en-US'`) :param binding_refresh_interval: time between binding updates (default: `0.1` seconds, bigger is more CPU friendly) :param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 3.0 seconds) - :param message_buffer_max: maximum number of messages that will be stored and resent after a connection interruption (default: 1000) + :param message_buffer_max: maximum number of messages that will be stored and resent after a connection interruption (default: 1000, use 0 to disable) :param show: automatically open the UI in a browser tab (default: `True`) :param on_air: tech preview: `allows temporary remote access `_ if set to `True` (default: disabled) :param native: open the UI in a native window of size 800x600 (default: `False`, deactivates `show`, automatically finds an open port) diff --git a/nicegui/ui_run_with.py b/nicegui/ui_run_with.py index 02756c6c1..8926cb27d 100644 --- a/nicegui/ui_run_with.py +++ b/nicegui/ui_run_with.py @@ -37,7 +37,7 @@ def run_with( :param language: language for Quasar elements (default: `'en-US'`) :param binding_refresh_interval: time between binding updates (default: `0.1` seconds, bigger is more CPU friendly) :param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 3.0 seconds) - :param message_buffer_max: maximum number of messages that will be stored and resent after a connection interruption (default: 1000) + :param message_buffer_max: maximum number of messages that will be stored and resent after a connection interruption (default: 1000, use 0 to disable) :param mount_path: mount NiceGUI at this path (default: `'/'`) :param on_air: tech preview: `allows temporary remote access `_ if set to `True` (default: disabled) :param tailwind: whether to use Tailwind CSS (experimental, default: `True`) From 230adcefa8d693cc3412536e5dcbc488147529b5 Mon Sep 17 00:00:00 2001 From: Falko Schindler Date: Sat, 27 Jul 2024 18:45:47 +0200 Subject: [PATCH 11/24] code review --- nicegui/outbox.py | 17 +++++++++-------- nicegui/static/nicegui.js | 27 ++++++++++++--------------- 2 files changed, 21 insertions(+), 23 deletions(-) diff --git a/nicegui/outbox.py b/nicegui/outbox.py index e0adc4018..f6ae43ef5 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -25,7 +25,7 @@ def __init__(self, client: Client) -> None: self.messages: Deque[Message] = deque() self._should_stop = False self._enqueue_event: Optional[asyncio.Event] = None - self._history: Optional[Deque[Tuple[int, float, Tuple[MessageType, Any, ClientId]]]] = None + self._history: Optional[Deque[Tuple[int, float, Message]]] = None self._message_count: int = 0 if self.client.shared: @@ -67,11 +67,11 @@ def enqueue_message(self, message_type: MessageType, data: Any, target_id: Clien self.messages.append((target_id, message_type, data)) self._set_enqueue_event() - def _append_history(self, message_type: MessageType, data: Any, target: ClientId) -> None: - self._message_count += 1 - timestamp = time.time() - self._history.append((self._message_count, timestamp, (message_type, data, target))) # type: ignore[union-attr] - while self._history and self._history[0][1] < timestamp - self._history_duration: + def _append_history(self, message: Message) -> None: + now = time.time() + assert self._history is not None + self._history.append((self._message_count, now, message)) + while self._history and self._history[0][1] < now - self._history_duration: self._history.popleft() async def synchronize(self, last_message_id: int, socket_ids: List[str]) -> None: @@ -86,7 +86,7 @@ async def synchronize(self, last_message_id: int, socket_ids: List[str]) -> None start = next_id - oldest_id for i in range(start, len(self._history)): msg = self._history[i][2] - if msg[2] == self.client.id or msg[2] in socket_ids: + if msg[0] == self.client.id or msg[0] in socket_ids: messages.append(msg) else: success = False @@ -143,8 +143,9 @@ async def loop(self) -> None: await asyncio.sleep(0.1) async def _emit(self, message_type: MessageType, data: Any, target_id: ClientId) -> None: + self._message_count += 1 if self._history is not None and message_type != 'sync': - self._append_history(message_type, data, target_id) + self._append_history((target_id, message_type, data)) data['message_id'] = self._message_count await core.sio.emit(message_type, data, room=target_id) diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js index 96f571057..61913a9b8 100644 --- a/nicegui/static/nicegui.js +++ b/nicegui/static/nicegui.js @@ -358,21 +358,18 @@ function createApp(elements, options) { download: (msg) => download(msg.src, msg.filename, msg.media_type, options.prefix), notify: (msg) => Quasar.Notify.create(msg), sync: (msg) => { - if (msg.target === window.socket.id) { - if (msg.success) { - window.syncing = false; - for (let i = 0; i < msg.history.length; i++) { - let message = msg.history[i][1]; - if (message.message_id > window.lastMessageId) { - window.lastMessageId = message.message_id; - delete message.message_id; - messageHandlers[msg.history[i][0]](message); - } - } - } else { - console.log("reloading because client could not be synchronized with the server"); - window.location.reload(); - } + if (msg.target !== window.socket.id) return; + if (!msg.success) { + console.log("Could synchronize with the server. Reloading..."); + window.location.reload(); + return; + } + window.syncing = false; + for (let [messageType, data] of msg.history) { + if (data.message_id <= window.lastMessageId) continue; + window.lastMessageId = data.message_id; + delete data.message_id; + messageHandlers[messageType](data); } }, }; From 00746778566de2e71da3d9d3c9e5dfe14296e0e6 Mon Sep 17 00:00:00 2001 From: Falko Schindler Date: Sat, 27 Jul 2024 19:03:35 +0200 Subject: [PATCH 12/24] fix order of history attributes --- nicegui/static/nicegui.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js index 05b54c853..4c465e433 100644 --- a/nicegui/static/nicegui.js +++ b/nicegui/static/nicegui.js @@ -378,7 +378,8 @@ function createApp(elements, options) { return; } window.syncing = false; - for (let [messageType, data] of msg.history) { + console.log(msg.history); + for (let [_, messageType, data] of msg.history) { if (data.message_id <= window.lastMessageId) continue; window.lastMessageId = data.message_id; delete data.message_id; From 0deb85d7eaefa661e90cb1b6fa5018bd6a0ba1e2 Mon Sep 17 00:00:00 2001 From: aafuller Date: Sun, 28 Jul 2024 02:44:42 -0400 Subject: [PATCH 13/24] add missing "not" to log message --- nicegui/static/nicegui.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js index 4c465e433..3ea66761b 100644 --- a/nicegui/static/nicegui.js +++ b/nicegui/static/nicegui.js @@ -373,7 +373,7 @@ function createApp(elements, options) { sync: (msg) => { if (msg.target !== window.socket.id) return; if (!msg.success) { - console.log("Could synchronize with the server. Reloading..."); + console.log("Could not synchronize with the server. Reloading..."); window.location.reload(); return; } From a3631b536d49a7264610a10c00e78eaea0cd182f Mon Sep 17 00:00:00 2001 From: aafuller Date: Tue, 30 Jul 2024 11:42:51 -0400 Subject: [PATCH 14/24] prevent incrementing _message_count for "sync" message --- nicegui/outbox.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/nicegui/outbox.py b/nicegui/outbox.py index f6ae43ef5..ad7fa3be7 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -143,10 +143,11 @@ async def loop(self) -> None: await asyncio.sleep(0.1) async def _emit(self, message_type: MessageType, data: Any, target_id: ClientId) -> None: - self._message_count += 1 - if self._history is not None and message_type != 'sync': - self._append_history((target_id, message_type, data)) - data['message_id'] = self._message_count + if message_type != 'sync': + self._message_count += 1 + if self._history is not None: + self._append_history((target_id, message_type, data)) + data['message_id'] = self._message_count await core.sio.emit(message_type, data, room=target_id) if core.air is not None and core.air.is_air_target(target_id): From 3dbbfd625031499be43615d18b82a660ae9460db Mon Sep 17 00:00:00 2001 From: aafuller Date: Tue, 30 Jul 2024 11:57:06 -0400 Subject: [PATCH 15/24] change config option to "message_history_length" --- nicegui/app/app_config.py | 6 +++--- nicegui/outbox.py | 4 ++-- nicegui/ui_run.py | 6 +++--- nicegui/ui_run_with.py | 6 +++--- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/nicegui/app/app_config.py b/nicegui/app/app_config.py index f43052650..579d586d8 100644 --- a/nicegui/app/app_config.py +++ b/nicegui/app/app_config.py @@ -32,7 +32,7 @@ class AppConfig: language: Language = field(init=False) binding_refresh_interval: float = field(init=False) reconnect_timeout: float = field(init=False) - message_buffer_max: int = field(init=False) + message_history_length: int = field(init=False) tailwind: bool = field(init=False) prod_js: bool = field(init=False) show_welcome_message: bool = field(init=False) @@ -48,7 +48,7 @@ def add_run_config(self, language: Language, binding_refresh_interval: float, reconnect_timeout: float, - message_buffer_max: int, + message_history_length: int, tailwind: bool, prod_js: bool, show_welcome_message: bool, @@ -62,7 +62,7 @@ def add_run_config(self, self.language = language self.binding_refresh_interval = binding_refresh_interval self.reconnect_timeout = reconnect_timeout - self.message_buffer_max = message_buffer_max + self.message_history_length = message_history_length self.tailwind = tailwind self.prod_js = prod_js self.show_welcome_message = show_welcome_message diff --git a/nicegui/outbox.py b/nicegui/outbox.py index ad7fa3be7..cdb848d96 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -98,8 +98,8 @@ async def synchronize(self, last_message_id: int, socket_ids: List[str]) -> None async def loop(self) -> None: """Send updates and messages to all clients in an endless loop.""" - if core.app.config.message_buffer_max: - self._history = Deque(maxlen=core.app.config.message_buffer_max) + if core.app.config.message_history_length: + self._history = Deque(maxlen=core.app.config.message_history_length) self._enqueue_event = asyncio.Event() self._enqueue_event.set() diff --git a/nicegui/ui_run.py b/nicegui/ui_run.py index f48999b9c..abfbdb652 100644 --- a/nicegui/ui_run.py +++ b/nicegui/ui_run.py @@ -31,7 +31,7 @@ def run(*, language: Language = 'en-US', binding_refresh_interval: float = 0.1, reconnect_timeout: float = 3.0, - message_buffer_max: int = 1000, + message_history_length: int = 1000, show: bool = True, on_air: Optional[Union[str, Literal[True]]] = None, native: bool = False, @@ -64,7 +64,7 @@ def run(*, :param language: language for Quasar elements (default: `'en-US'`) :param binding_refresh_interval: time between binding updates (default: `0.1` seconds, bigger is more CPU friendly) :param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 3.0 seconds) - :param message_buffer_max: maximum number of messages that will be stored and resent after a connection interruption (default: 1000, use 0 to disable) + :param message_history_length: maximum number of messages that will be stored and resent after a connection interruption (default: 1000, use 0 to disable) :param show: automatically open the UI in a browser tab (default: `True`) :param on_air: tech preview: `allows temporary remote access `_ if set to `True` (default: disabled) :param native: open the UI in a native window of size 800x600 (default: `False`, deactivates `show`, automatically finds an open port) @@ -92,7 +92,7 @@ def run(*, language=language, binding_refresh_interval=binding_refresh_interval, reconnect_timeout=reconnect_timeout, - message_buffer_max=message_buffer_max, + message_history_length=message_history_length, tailwind=tailwind, prod_js=prod_js, show_welcome_message=show_welcome_message, diff --git a/nicegui/ui_run_with.py b/nicegui/ui_run_with.py index 8926cb27d..a7a4e8640 100644 --- a/nicegui/ui_run_with.py +++ b/nicegui/ui_run_with.py @@ -19,7 +19,7 @@ def run_with( language: Language = 'en-US', binding_refresh_interval: float = 0.1, reconnect_timeout: float = 3.0, - message_buffer_max: int = 1000, + message_history_length: int = 1000, mount_path: str = '/', on_air: Optional[Union[str, Literal[True]]] = None, tailwind: bool = True, @@ -37,7 +37,7 @@ def run_with( :param language: language for Quasar elements (default: `'en-US'`) :param binding_refresh_interval: time between binding updates (default: `0.1` seconds, bigger is more CPU friendly) :param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 3.0 seconds) - :param message_buffer_max: maximum number of messages that will be stored and resent after a connection interruption (default: 1000, use 0 to disable) + :param message_history_length: maximum number of messages that will be stored and resent after a connection interruption (default: 1000, use 0 to disable) :param mount_path: mount NiceGUI at this path (default: `'/'`) :param on_air: tech preview: `allows temporary remote access `_ if set to `True` (default: disabled) :param tailwind: whether to use Tailwind CSS (experimental, default: `True`) @@ -54,7 +54,7 @@ def run_with( language=language, binding_refresh_interval=binding_refresh_interval, reconnect_timeout=reconnect_timeout, - message_buffer_max=message_buffer_max, + message_history_length=message_history_length, tailwind=tailwind, prod_js=prod_js, show_welcome_message=show_welcome_message, From a8ace7171fb2c33abb92cf9e293b946e2f13f2f7 Mon Sep 17 00:00:00 2001 From: aafuller Date: Tue, 30 Jul 2024 12:27:46 -0400 Subject: [PATCH 16/24] wrap message payload --- nicegui/outbox.py | 10 ++++++---- nicegui/static/nicegui.js | 7 +++---- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/nicegui/outbox.py b/nicegui/outbox.py index cdb848d96..eb02ccc35 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -145,13 +145,15 @@ async def loop(self) -> None: async def _emit(self, message_type: MessageType, data: Any, target_id: ClientId) -> None: if message_type != 'sync': self._message_count += 1 + message = {'message_id': self._message_count, 'payload': data} if self._history is not None: - self._append_history((target_id, message_type, data)) - data['message_id'] = self._message_count + self._append_history((target_id, message_type, message)) + else: + message = data - await core.sio.emit(message_type, data, room=target_id) + await core.sio.emit(message_type, message, room=target_id) if core.air is not None and core.air.is_air_target(target_id): - await core.air.emit(message_type, data, room=target_id) + await core.air.emit(message_type, message, room=target_id) def stop(self) -> None: """Stop the outbox loop.""" diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js index 3ea66761b..3df66f906 100644 --- a/nicegui/static/nicegui.js +++ b/nicegui/static/nicegui.js @@ -382,8 +382,7 @@ function createApp(elements, options) { for (let [_, messageType, data] of msg.history) { if (data.message_id <= window.lastMessageId) continue; window.lastMessageId = data.message_id; - delete data.message_id; - messageHandlers[messageType](data); + messageHandlers[messageType](data.payload); } }, }; @@ -391,13 +390,13 @@ function createApp(elements, options) { let isProcessingSocketMessage = false; for (const [event, handler] of Object.entries(messageHandlers)) { window.socket.on(event, async (...args) => { - const data = args[0]; + let data = args[0]; if (data && data.hasOwnProperty("message_id")) { if (window.syncing || data.message_id <= window.lastMessageId) { return; } window.lastMessageId = data.message_id; - delete data.message_id; + args[0] = data.payload; } socketMessageQueue.push(() => handler(...args)); if (!isProcessingSocketMessage) { From 33e890eba958a7adbe28bbfcc579ae2d59edaaef Mon Sep 17 00:00:00 2001 From: aafuller Date: Wed, 31 Jul 2024 11:17:25 -0400 Subject: [PATCH 17/24] remove previous socket ID after sync --- nicegui/static/nicegui.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js index 3df66f906..f6fd4ca2b 100644 --- a/nicegui/static/nicegui.js +++ b/nicegui/static/nicegui.js @@ -384,13 +384,14 @@ function createApp(elements, options) { window.lastMessageId = data.message_id; messageHandlers[messageType](data.payload); } + window.socketIds = window.socketIds.slice(-1); }, }; const socketMessageQueue = []; let isProcessingSocketMessage = false; for (const [event, handler] of Object.entries(messageHandlers)) { window.socket.on(event, async (...args) => { - let data = args[0]; + const data = args[0]; if (data && data.hasOwnProperty("message_id")) { if (window.syncing || data.message_id <= window.lastMessageId) { return; From 2163fe0e20e0190e4f832f77102958b509729f32 Mon Sep 17 00:00:00 2001 From: Falko Schindler Date: Sat, 12 Oct 2024 20:43:06 +0200 Subject: [PATCH 18/24] code review --- nicegui/client.py | 2 +- nicegui/outbox.py | 27 +++++++++++++-------------- nicegui/static/nicegui.js | 3 +-- 3 files changed, 15 insertions(+), 17 deletions(-) diff --git a/nicegui/client.py b/nicegui/client.py index eb5daec9a..0b894fb02 100644 --- a/nicegui/client.py +++ b/nicegui/client.py @@ -126,7 +126,7 @@ def build_response(self, request: Request, status_code: int = 200) -> Response: socket_io_js_query_params = { **core.app.config.socket_io_js_query_params, 'client_id': self.id, - 'starting_message_id': self.outbox.message_count, + 'starting_message_id': self.outbox.next_message_id, } vue_html, vue_styles, vue_scripts, imports, js_imports = generate_resources(prefix, self.elements.values()) return templates.TemplateResponse( diff --git a/nicegui/outbox.py b/nicegui/outbox.py index eb02ccc35..e1f39c320 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -15,6 +15,7 @@ ElementId = int MessageType = str Message = Tuple[ClientId, MessageType, Any] +HistoryEntry = Tuple[int, float, Message] class Outbox: @@ -25,8 +26,8 @@ def __init__(self, client: Client) -> None: self.messages: Deque[Message] = deque() self._should_stop = False self._enqueue_event: Optional[asyncio.Event] = None - self._history: Optional[Deque[Tuple[int, float, Message]]] = None - self._message_count: int = 0 + self._history: Optional[Deque[HistoryEntry]] = None + self._next_message_id: int = 0 if self.client.shared: self._history_duration = 30 @@ -40,9 +41,9 @@ def __init__(self, client: Client) -> None: core.app.on_startup(self.loop) @property - def message_count(self) -> int: - """Total number of messages sent.""" - return self._message_count + def next_message_id(self) -> int: + """Next message ID (equals the total number of messages already sent).""" + return self._next_message_id def _set_enqueue_event(self) -> None: """Set the enqueue event while accounting for lazy initialization.""" @@ -70,7 +71,7 @@ def enqueue_message(self, message_type: MessageType, data: Any, target_id: Clien def _append_history(self, message: Message) -> None: now = time.time() assert self._history is not None - self._history.append((self._message_count, now, message)) + self._history.append((self._next_message_id, now, message)) while self._history and self._history[0][1] < now - self._history_duration: self._history.popleft() @@ -91,7 +92,7 @@ async def synchronize(self, last_message_id: int, socket_ids: List[str]) -> None else: success = False - elif last_message_id != self._message_count: + elif last_message_id != self._next_message_id: success = False await self._emit('sync', {'success': success, 'target': socket_ids[-1], 'history': messages}, self.client.id) @@ -144,16 +145,14 @@ async def loop(self) -> None: async def _emit(self, message_type: MessageType, data: Any, target_id: ClientId) -> None: if message_type != 'sync': - self._message_count += 1 - message = {'message_id': self._message_count, 'payload': data} + self._next_message_id += 1 + data = {'message_id': self._next_message_id, 'payload': data} if self._history is not None: - self._append_history((target_id, message_type, message)) - else: - message = data + self._append_history((target_id, message_type, data)) - await core.sio.emit(message_type, message, room=target_id) + await core.sio.emit(message_type, data, room=target_id) if core.air is not None and core.air.is_air_target(target_id): - await core.air.emit(message_type, message, room=target_id) + await core.air.emit(message_type, data, room=target_id) def stop(self) -> None: """Stop the outbox loop.""" diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js index bfa6fe38f..1488ad5b0 100644 --- a/nicegui/static/nicegui.js +++ b/nicegui/static/nicegui.js @@ -372,7 +372,7 @@ function createApp(elements, options) { replaceUndefinedAttributes(this.elements, id); } }, - run_javascript: (msg) => runJavascript(msg["code"], msg["request_id"]), + run_javascript: (msg) => runJavascript(msg.code, msg.request_id), open: (msg) => { const url = msg.path.startsWith("/") ? options.prefix + msg.path : msg.path; const target = msg.new_tab ? "_blank" : "_self"; @@ -388,7 +388,6 @@ function createApp(elements, options) { return; } window.syncing = false; - console.log(msg.history); for (let [_, messageType, data] of msg.history) { if (data.message_id <= window.lastMessageId) continue; window.lastMessageId = data.message_id; From 2daa782acef1b4125754f39cebf47f4fb957e246 Mon Sep 17 00:00:00 2001 From: Falko Schindler Date: Sat, 12 Oct 2024 20:54:55 +0200 Subject: [PATCH 19/24] fix pytest fixture --- nicegui/testing/general_fixtures.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nicegui/testing/general_fixtures.py b/nicegui/testing/general_fixtures.py index 84643fbfd..8f69b6fea 100644 --- a/nicegui/testing/general_fixtures.py +++ b/nicegui/testing/general_fixtures.py @@ -74,6 +74,7 @@ def prepare_simulation(request: pytest.FixtureRequest) -> None: language='en-US', binding_refresh_interval=0.1, reconnect_timeout=3.0, + message_history_length=1000, tailwind=True, prod_js=True, show_welcome_message=False, From 8edf2989edd3e36a6fbebda940033feca8f853b2 Mon Sep 17 00:00:00 2001 From: Falko Schindler Date: Sat, 19 Oct 2024 22:25:16 +0200 Subject: [PATCH 20/24] simplify retransmission by keeping sent messages in message queue --- nicegui/air.py | 4 +- nicegui/client.py | 2 +- nicegui/nicegui.py | 2 +- nicegui/outbox.py | 107 ++++++++++++++++---------------------- nicegui/static/nicegui.js | 36 +++---------- 5 files changed, 55 insertions(+), 96 deletions(-) diff --git a/nicegui/air.py b/nicegui/air.py index 2fb09ce19..399fcb9f6 100644 --- a/nicegui/air.py +++ b/nicegui/air.py @@ -124,7 +124,7 @@ def _handleerror(data: Dict[str, Any]) -> None: print('Error:', data['message'], flush=True) @self.relay.on('handshake') - async def _handle_handshake(data: Dict[str, Any]) -> bool: + def _handle_handshake(data: Dict[str, Any]) -> bool: client_id = data['client_id'] if client_id not in Client.instances: return False @@ -132,7 +132,7 @@ async def _handle_handshake(data: Dict[str, Any]) -> bool: client.environ = data['environ'] client.tab_id = data['tab_id'] client.on_air = True - await client.outbox.synchronize(data['last_message_id'], data['socket_ids']) + client.outbox.seek(data['next_message_id']) client.handle_handshake() return True diff --git a/nicegui/client.py b/nicegui/client.py index 0b894fb02..0c4c30f0c 100644 --- a/nicegui/client.py +++ b/nicegui/client.py @@ -126,7 +126,7 @@ def build_response(self, request: Request, status_code: int = 200) -> Response: socket_io_js_query_params = { **core.app.config.socket_io_js_query_params, 'client_id': self.id, - 'starting_message_id': self.outbox.next_message_id, + 'next_message_id': self.outbox.next_message_id, } vue_html, vue_styles, vue_scripts, imports, js_imports = generate_resources(prefix, self.elements.values()) return templates.TemplateResponse( diff --git a/nicegui/nicegui.py b/nicegui/nicegui.py index fb85839fa..b47bb17ab 100644 --- a/nicegui/nicegui.py +++ b/nicegui/nicegui.py @@ -173,7 +173,7 @@ async def _on_handshake(sid: str, data: Dict[str, Any]) -> bool: else: client.environ = sio.get_environ(sid) await sio.enter_room(sid, client.id) - await client.outbox.synchronize(data['last_message_id'], data['socket_ids']) + client.outbox.seek(data['next_message_id']) client.handle_handshake() return True diff --git a/nicegui/outbox.py b/nicegui/outbox.py index e1f39c320..dd76e1544 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -1,9 +1,10 @@ from __future__ import annotations import asyncio +import itertools import time from collections import deque -from typing import TYPE_CHECKING, Any, Deque, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Deque, Dict, Optional, Tuple from . import background_tasks, core @@ -11,11 +12,13 @@ from .client import Client from .element import Element -ClientId = str ElementId = int +ClientId = str +MessageId = int MessageType = str -Message = Tuple[ClientId, MessageType, Any] -HistoryEntry = Tuple[int, float, Message] +MessageTime = float +Payload = Any +Message = Tuple[ClientId, MessageId, MessageTime, MessageType, Payload] class Outbox: @@ -26,14 +29,8 @@ def __init__(self, client: Client) -> None: self.messages: Deque[Message] = deque() self._should_stop = False self._enqueue_event: Optional[asyncio.Event] = None - self._history: Optional[Deque[HistoryEntry]] = None - self._next_message_id: int = 0 - - if self.client.shared: - self._history_duration = 30 - else: - connection_timeout = core.sio.eio.ping_interval + core.sio.eio.ping_timeout - self._history_duration = connection_timeout + self.client.page.resolve_reconnect_timeout() + self.next_message_id: int = 0 + self._message_index: int = 0 if core.app.is_started: background_tasks.create(self.loop(), name=f'outbox loop {client.id}') @@ -41,9 +38,12 @@ def __init__(self, client: Client) -> None: core.app.on_startup(self.loop) @property - def next_message_id(self) -> int: - """Next message ID (equals the total number of messages already sent).""" - return self._next_message_id + def history_duration(self) -> float: + """Duration of the message history in seconds.""" + if self.client.shared: + return 0 + else: + return core.sio.eio.ping_interval + core.sio.eio.ping_timeout + self.client.page.resolve_reconnect_timeout() def _set_enqueue_event(self) -> None: """Set the enqueue event while accounting for lazy initialization.""" @@ -62,46 +62,15 @@ def enqueue_delete(self, element: Element) -> None: self.updates[element.id] = None self._set_enqueue_event() - def enqueue_message(self, message_type: MessageType, data: Any, target_id: ClientId) -> None: + def enqueue_message(self, message_type: MessageType, data: Payload, target_id: ClientId) -> None: """Enqueue a message for the given client.""" self.client.check_existence() - self.messages.append((target_id, message_type, data)) + self.messages.append((target_id, self.next_message_id, time.time(), message_type, data)) + self.next_message_id += 1 self._set_enqueue_event() - def _append_history(self, message: Message) -> None: - now = time.time() - assert self._history is not None - self._history.append((self._next_message_id, now, message)) - while self._history and self._history[0][1] < now - self._history_duration: - self._history.popleft() - - async def synchronize(self, last_message_id: int, socket_ids: List[str]) -> None: - """Synchronize the state of a connecting client by resending missed messages, if possible.""" - messages = [] - success = True - if self._history is not None: - if self._history: - next_id = last_message_id + 1 - oldest_id = self._history[0][0] - if oldest_id <= next_id: - start = next_id - oldest_id - for i in range(start, len(self._history)): - msg = self._history[i][2] - if msg[0] == self.client.id or msg[0] in socket_ids: - messages.append(msg) - else: - success = False - - elif last_message_id != self._next_message_id: - success = False - - await self._emit('sync', {'success': success, 'target': socket_ids[-1], 'history': messages}, self.client.id) - async def loop(self) -> None: """Send updates and messages to all clients in an endless loop.""" - if core.app.config.message_history_length: - self._history = Deque(maxlen=core.app.config.message_history_length) - self._enqueue_event = asyncio.Event() self._enqueue_event.set() @@ -125,13 +94,15 @@ async def loop(self) -> None: element_id: None if element is None else element._to_dict() # pylint: disable=protected-access for element_id, element in self.updates.items() } - coros.append(self._emit('update', data, self.client.id)) + self.messages.append((self.client.id, self.next_message_id, time.time(), 'update', data)) + self.next_message_id += 1 self.updates.clear() - if self.messages: - for target_id, message_type, data in self.messages: - coros.append(self._emit(message_type, data, target_id)) - self.messages.clear() + if len(self.messages) > self._message_index: + for message in itertools.islice(self.messages, self._message_index, None): + coros.append(self._emit(message)) + self._prune_messages() + self._message_index = len(self.messages) for coro in coros: try: @@ -143,16 +114,26 @@ async def loop(self) -> None: core.app.handle_exception(e) await asyncio.sleep(0.1) - async def _emit(self, message_type: MessageType, data: Any, target_id: ClientId) -> None: - if message_type != 'sync': - self._next_message_id += 1 - data = {'message_id': self._next_message_id, 'payload': data} - if self._history is not None: - self._append_history((target_id, message_type, data)) + async def _emit(self, message: Message) -> None: + client_id, message_id, _, message_type, data = message + data['_id'] = message_id + await core.sio.emit(message_type, data, room=client_id) + if core.air is not None and core.air.is_air_target(client_id): + await core.air.emit(message_type, data, room=client_id) - await core.sio.emit(message_type, data, room=target_id) - if core.air is not None and core.air.is_air_target(target_id): - await core.air.emit(message_type, data, room=target_id) + def _prune_messages(self) -> None: + if self.client.shared: + self.messages.clear() + else: + while self.messages and self.messages[0][2] < time.time() - self.history_duration: + self.messages.popleft() + + def seek(self, message_id: MessageId) -> None: + """Seek to the given message ID and discard all messages before it.""" + while self.messages and self.messages[0][1] < message_id: + self.messages.popleft() + self._message_index = 0 + self._set_enqueue_event() def stop(self) -> None: """Stop the outbox loop.""" diff --git a/nicegui/static/nicegui.js b/nicegui/static/nicegui.js index 1488ad5b0..6834c5d1b 100644 --- a/nicegui/static/nicegui.js +++ b/nicegui/static/nicegui.js @@ -310,9 +310,7 @@ function createApp(elements, options) { window.clientId = options.query.client_id; const url = window.location.protocol === "https:" ? "wss://" : "ws://" + window.location.host; window.path_prefix = options.prefix; - window.lastMessageId = options.query.starting_message_id; - window.syncing = true; - window.socketIds = []; + window.nextMessageId = options.query.next_message_id; window.socket = io(url, { path: `${options.prefix}/_nicegui_ws/socket.io`, query: options.query, @@ -327,12 +325,10 @@ function createApp(elements, options) { tabId = createRandomUUID(); sessionStorage.setItem("__nicegui_tab_id", tabId); } - window.socketIds.push(window.socket.id); const args = { client_id: window.clientId, tab_id: tabId, - last_message_id: window.lastMessageId, - socket_ids: window.socketIds, + next_message_id: window.nextMessageId, }; window.socket.emit("handshake", args, (ok) => { if (!ok) { @@ -357,7 +353,6 @@ function createApp(elements, options) { }, disconnect: () => { document.getElementById("popup").ariaHidden = false; - window.syncing = true; }, update: async (msg) => { for (const [id, element] of Object.entries(msg)) { @@ -380,33 +375,16 @@ function createApp(elements, options) { }, download: (msg) => download(msg.src, msg.filename, msg.media_type, options.prefix), notify: (msg) => Quasar.Notify.create(msg), - sync: (msg) => { - if (msg.target !== window.socket.id) return; - if (!msg.success) { - console.log("Could not synchronize with the server. Reloading..."); - window.location.reload(); - return; - } - window.syncing = false; - for (let [_, messageType, data] of msg.history) { - if (data.message_id <= window.lastMessageId) continue; - window.lastMessageId = data.message_id; - messageHandlers[messageType](data.payload); - } - window.socketIds = window.socketIds.slice(-1); - }, }; const socketMessageQueue = []; let isProcessingSocketMessage = false; for (const [event, handler] of Object.entries(messageHandlers)) { window.socket.on(event, async (...args) => { - const data = args[0]; - if (data && data.hasOwnProperty("message_id")) { - if (window.syncing || data.message_id <= window.lastMessageId) { - return; - } - window.lastMessageId = data.message_id; - args[0] = data.payload; + if (args.length > 0 && args[0]._id !== undefined) { + const message_id = args[0]._id; + if (message_id < window.nextMessageId) return; + window.nextMessageId = message_id + 1; + delete args[0]._id; } socketMessageQueue.push(() => handler(...args)); if (!isProcessingSocketMessage) { From 16098a815c688cb0b28c5ca7bd3e4c263bb839b9 Mon Sep 17 00:00:00 2001 From: Falko Schindler Date: Fri, 25 Oct 2024 08:46:55 +0200 Subject: [PATCH 21/24] consider maximum `message_history_length` --- nicegui/outbox.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/nicegui/outbox.py b/nicegui/outbox.py index dd76e1544..acc1898e3 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -127,6 +127,8 @@ def _prune_messages(self) -> None: else: while self.messages and self.messages[0][2] < time.time() - self.history_duration: self.messages.popleft() + while len(self.messages) > core.app.config.message_history_length: + self.messages.pop() def seek(self, message_id: MessageId) -> None: """Seek to the given message ID and discard all messages before it.""" @@ -134,6 +136,8 @@ def seek(self, message_id: MessageId) -> None: self.messages.popleft() self._message_index = 0 self._set_enqueue_event() + if not self.messages and message_id != self.next_message_id: + self.client.run_javascript('window.location.reload()') def stop(self) -> None: """Stop the outbox loop.""" From d3448c7bdac55327f26760aade159e42b8f54bb8 Mon Sep 17 00:00:00 2001 From: Falko Schindler Date: Sat, 26 Oct 2024 12:41:48 +0200 Subject: [PATCH 22/24] re-introduce a message_history queue to fix message ID hiccups --- nicegui/air.py | 2 +- nicegui/nicegui.py | 2 +- nicegui/outbox.py | 64 ++++++++++++++++++++++++---------------------- 3 files changed, 36 insertions(+), 32 deletions(-) diff --git a/nicegui/air.py b/nicegui/air.py index 9c6c36fed..96487b219 100644 --- a/nicegui/air.py +++ b/nicegui/air.py @@ -134,7 +134,7 @@ def _handle_handshake(data: Dict[str, Any]) -> bool: core.app.storage.copy_tab(data['old_tab_id'], data['tab_id']) client.tab_id = data['tab_id'] client.on_air = True - client.outbox.seek(data['next_message_id']) + client.outbox.try_rewind(data['next_message_id']) client.handle_handshake() return True diff --git a/nicegui/nicegui.py b/nicegui/nicegui.py index dab569d61..75c25def6 100644 --- a/nicegui/nicegui.py +++ b/nicegui/nicegui.py @@ -175,7 +175,7 @@ async def _on_handshake(sid: str, data: Dict[str, Any]) -> bool: else: client.environ = sio.get_environ(sid) await sio.enter_room(sid, client.id) - client.outbox.seek(data['next_message_id']) + client.outbox.try_rewind(data['next_message_id']) client.handle_handshake() return True diff --git a/nicegui/outbox.py b/nicegui/outbox.py index acc1898e3..21dda88e6 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import itertools import time from collections import deque from typing import TYPE_CHECKING, Any, Deque, Dict, Optional, Tuple @@ -13,12 +12,15 @@ from .element import Element ElementId = int + ClientId = str -MessageId = int MessageType = str -MessageTime = float Payload = Any -Message = Tuple[ClientId, MessageId, MessageTime, MessageType, Payload] +Message = Tuple[ClientId, MessageType, Payload] + +MessageId = int +MessageTime = float +HistoryEntry = Tuple[MessageId, MessageTime, Message] class Outbox: @@ -27,10 +29,10 @@ def __init__(self, client: Client) -> None: self.client = client self.updates: Dict[ElementId, Optional[Element]] = {} self.messages: Deque[Message] = deque() + self.message_history: Deque[HistoryEntry] = deque() self._should_stop = False self._enqueue_event: Optional[asyncio.Event] = None self.next_message_id: int = 0 - self._message_index: int = 0 if core.app.is_started: background_tasks.create(self.loop(), name=f'outbox loop {client.id}') @@ -65,7 +67,7 @@ def enqueue_delete(self, element: Element) -> None: def enqueue_message(self, message_type: MessageType, data: Payload, target_id: ClientId) -> None: """Enqueue a message for the given client.""" self.client.check_existence() - self.messages.append((target_id, self.next_message_id, time.time(), message_type, data)) + self.messages.append((target_id, message_type, data)) self.next_message_id += 1 self._set_enqueue_event() @@ -94,15 +96,13 @@ async def loop(self) -> None: element_id: None if element is None else element._to_dict() # pylint: disable=protected-access for element_id, element in self.updates.items() } - self.messages.append((self.client.id, self.next_message_id, time.time(), 'update', data)) - self.next_message_id += 1 + coros.append(self._emit((self.client.id, 'update', data))) self.updates.clear() - if len(self.messages) > self._message_index: - for message in itertools.islice(self.messages, self._message_index, None): + if self.messages: + for message in self.messages: coros.append(self._emit(message)) - self._prune_messages() - self._message_index = len(self.messages) + self.messages.clear() for coro in coros: try: @@ -115,28 +115,32 @@ async def loop(self) -> None: await asyncio.sleep(0.1) async def _emit(self, message: Message) -> None: - client_id, message_id, _, message_type, data = message - data['_id'] = message_id + client_id, message_type, data = message + data['_id'] = self.next_message_id + self.next_message_id += 1 + await core.sio.emit(message_type, data, room=client_id) if core.air is not None and core.air.is_air_target(client_id): await core.air.emit(message_type, data, room=client_id) - def _prune_messages(self) -> None: - if self.client.shared: - self.messages.clear() - else: - while self.messages and self.messages[0][2] < time.time() - self.history_duration: - self.messages.popleft() - while len(self.messages) > core.app.config.message_history_length: - self.messages.pop() - - def seek(self, message_id: MessageId) -> None: - """Seek to the given message ID and discard all messages before it.""" - while self.messages and self.messages[0][1] < message_id: - self.messages.popleft() - self._message_index = 0 - self._set_enqueue_event() - if not self.messages and message_id != self.next_message_id: + if not self.client.shared: + self.message_history.append((self.next_message_id, time.time(), message)) + while self.message_history and self.message_history[0][1] < time.time() - self.history_duration: + self.message_history.popleft() + while len(self.message_history) > core.app.config.message_history_length: + self.message_history.popleft() + + def try_rewind(self, target_message_id: MessageId) -> None: + """Rewind to the given message ID and discard all messages before it.""" + while self.message_history: + message_id, _, message = self.message_history.pop() + self.messages.appendleft(message) + if message_id == target_message_id: + self.message_history.clear() + self._set_enqueue_event() + self.next_message_id = message_id + return + if self.message_history: self.client.run_javascript('window.location.reload()') def stop(self) -> None: From 3408c3840e7c7fb1a9cf6aad171d6f996df72415 Mon Sep 17 00:00:00 2001 From: Falko Schindler Date: Sat, 26 Oct 2024 12:55:32 +0200 Subject: [PATCH 23/24] cleanup and small corrections --- nicegui/outbox.py | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/nicegui/outbox.py b/nicegui/outbox.py index 21dda88e6..b3c494112 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -30,23 +30,16 @@ def __init__(self, client: Client) -> None: self.updates: Dict[ElementId, Optional[Element]] = {} self.messages: Deque[Message] = deque() self.message_history: Deque[HistoryEntry] = deque() + self.next_message_id: int = 0 + self._should_stop = False self._enqueue_event: Optional[asyncio.Event] = None - self.next_message_id: int = 0 if core.app.is_started: background_tasks.create(self.loop(), name=f'outbox loop {client.id}') else: core.app.on_startup(self.loop) - @property - def history_duration(self) -> float: - """Duration of the message history in seconds.""" - if self.client.shared: - return 0 - else: - return core.sio.eio.ping_interval + core.sio.eio.ping_timeout + self.client.page.resolve_reconnect_timeout() - def _set_enqueue_event(self) -> None: """Set the enqueue event while accounting for lazy initialization.""" if self._enqueue_event: @@ -68,7 +61,6 @@ def enqueue_message(self, message_type: MessageType, data: Payload, target_id: C """Enqueue a message for the given client.""" self.client.check_existence() self.messages.append((target_id, message_type, data)) - self.next_message_id += 1 self._set_enqueue_event() async def loop(self) -> None: @@ -117,7 +109,6 @@ async def loop(self) -> None: async def _emit(self, message: Message) -> None: client_id, message_type, data = message data['_id'] = self.next_message_id - self.next_message_id += 1 await core.sio.emit(message_type, data, room=client_id) if core.air is not None and core.air.is_air_target(client_id): @@ -125,23 +116,31 @@ async def _emit(self, message: Message) -> None: if not self.client.shared: self.message_history.append((self.next_message_id, time.time(), message)) - while self.message_history and self.message_history[0][1] < time.time() - self.history_duration: + max_age = core.sio.eio.ping_interval + core.sio.eio.ping_timeout + self.client.page.resolve_reconnect_timeout() + while self.message_history and self.message_history[0][1] < time.time() - max_age: self.message_history.popleft() while len(self.message_history) > core.app.config.message_history_length: self.message_history.popleft() + self.next_message_id += 1 + def try_rewind(self, target_message_id: MessageId) -> None: """Rewind to the given message ID and discard all messages before it.""" + # nothing to do, the next message ID is already the target message ID + if self.next_message_id == target_message_id: + return + + # rewind to the target message ID while self.message_history: - message_id, _, message = self.message_history.pop() + self.next_message_id, _, message = self.message_history.pop() self.messages.appendleft(message) - if message_id == target_message_id: + if self.next_message_id == target_message_id: self.message_history.clear() self._set_enqueue_event() - self.next_message_id = message_id return - if self.message_history: - self.client.run_javascript('window.location.reload()') + + # target message ID not found, reload the page + self.client.run_javascript('window.location.reload()') def stop(self) -> None: """Stop the outbox loop.""" From 69e8a52d8c24ba723a1dc04bf71304fc40fa17a4 Mon Sep 17 00:00:00 2001 From: Falko Schindler Date: Sat, 26 Oct 2024 13:22:38 +0200 Subject: [PATCH 24/24] don't reload on shared pages --- nicegui/outbox.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nicegui/outbox.py b/nicegui/outbox.py index b3c494112..fc17f124b 100644 --- a/nicegui/outbox.py +++ b/nicegui/outbox.py @@ -140,7 +140,8 @@ def try_rewind(self, target_message_id: MessageId) -> None: return # target message ID not found, reload the page - self.client.run_javascript('window.location.reload()') + if not self.client.shared: + self.client.run_javascript('window.location.reload()') def stop(self) -> None: """Stop the outbox loop."""