Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add message history and retransmission #3199

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion nicegui/air.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,15 @@ def _handleerror(data: Dict[str, Any]) -> None:
print('Error:', data['message'], flush=True)

@self.relay.on('handshake')
def _handle_handshake(data: Dict[str, Any]) -> bool:
async def _handle_handshake(data: Dict[str, Any]) -> bool:
client_id = data['client_id']
if client_id not in Client.instances:
return False
client = Client.instances[client_id]
client.environ = data['environ']
client.tab_id = data['tab_id']
client.on_air = True
await client.outbox.synchronize(data['last_message_id'], data['socket_ids'])
client.handle_handshake()
return True

Expand Down
3 changes: 3 additions & 0 deletions nicegui/app/app_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class AppConfig:
language: Language = field(init=False)
binding_refresh_interval: float = field(init=False)
reconnect_timeout: float = field(init=False)
message_history_length: int = field(init=False)
tailwind: bool = field(init=False)
prod_js: bool = field(init=False)
show_welcome_message: bool = field(init=False)
Expand All @@ -47,6 +48,7 @@ def add_run_config(self,
language: Language,
binding_refresh_interval: float,
reconnect_timeout: float,
message_history_length: int,
tailwind: bool,
prod_js: bool,
show_welcome_message: bool,
Expand All @@ -60,6 +62,7 @@ def add_run_config(self,
self.language = language
self.binding_refresh_interval = binding_refresh_interval
self.reconnect_timeout = reconnect_timeout
self.message_history_length = message_history_length
self.tailwind = tailwind
self.prod_js = prod_js
self.show_welcome_message = show_welcome_message
Expand Down
14 changes: 7 additions & 7 deletions nicegui/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def __init__(self, page: page, *, request: Optional[Request]) -> None:
self._has_warned_about_deleted_client = False
self.tab_id: Optional[str] = None

self.page = page
self.outbox = Outbox(self)

with Element('q-layout', _client=self).props('view="hhh lpr fff"').classes('nicegui-layout') as self.layout:
Expand All @@ -76,7 +77,6 @@ def __init__(self, page: page, *, request: Optional[Request]) -> None:
self._head_html = ''
self._body_html = ''

self.page = page
self.storage = ObservableDict()

self.connect_handlers: List[Union[Callable[..., Any], Awaitable]] = []
Expand Down Expand Up @@ -123,7 +123,11 @@ def build_response(self, request: Request, status_code: int = 200) -> Response:
elements = json.dumps({
id: element._to_dict() for id, element in self.elements.items() # pylint: disable=protected-access
})
socket_io_js_query_params = {**core.app.config.socket_io_js_query_params, 'client_id': self.id}
socket_io_js_query_params = {
**core.app.config.socket_io_js_query_params,
'client_id': self.id,
'starting_message_id': self.outbox.message_count,
}
vue_html, vue_styles, vue_scripts, imports, js_imports = generate_resources(prefix, self.elements.values())
return templates.TemplateResponse(
request=request,
Expand Down Expand Up @@ -254,11 +258,7 @@ def handle_handshake(self) -> None:
def handle_disconnect(self) -> None:
"""Wait for the browser to reconnect; invoke disconnect handlers if it doesn't."""
async def handle_disconnect() -> None:
if self.page.reconnect_timeout is not None:
delay = self.page.reconnect_timeout
else:
delay = core.app.config.reconnect_timeout # pylint: disable=protected-access
await asyncio.sleep(delay)
await asyncio.sleep(self.page.resolve_reconnect_timeout())
for t in self.disconnect_handlers:
self.safe_invoke(t)
for t in core.app._disconnect_handlers: # pylint: disable=protected-access
Expand Down
5 changes: 3 additions & 2 deletions nicegui/nicegui.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import urllib.parse
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Dict
from typing import Any, Dict

import socketio
from fastapi import HTTPException, Request
Expand Down Expand Up @@ -163,13 +163,14 @@ async def _exception_handler_500(request: Request, exception: Exception) -> Resp


@sio.on('handshake')
async def _on_handshake(sid: str, data: Dict[str, str]) -> bool:
async def _on_handshake(sid: str, data: Dict[str, Any]) -> bool:
client = Client.instances.get(data['client_id'])
if not client:
return False
client.environ = sio.get_environ(sid)
client.tab_id = data['tab_id']
await sio.enter_room(sid, client.id)
await client.outbox.synchronize(data['last_message_id'], data['socket_ids'])
client.handle_handshake()
return True

Expand Down
61 changes: 58 additions & 3 deletions nicegui/outbox.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations

import asyncio
import time
from collections import deque
from typing import TYPE_CHECKING, Any, Deque, Dict, Optional, Tuple
from typing import TYPE_CHECKING, Any, Deque, Dict, List, Optional, Tuple

from . import background_tasks, core

Expand All @@ -24,11 +25,25 @@ def __init__(self, client: Client) -> None:
self.messages: Deque[Message] = deque()
self._should_stop = False
self._enqueue_event: Optional[asyncio.Event] = None
self._history: Optional[Deque[Tuple[int, float, Message]]] = None
self._message_count: int = 0

if self.client.shared:
self._history_duration = 30
else:
connection_timeout = core.sio.eio.ping_interval + core.sio.eio.ping_timeout
self._history_duration = connection_timeout + self.client.page.resolve_reconnect_timeout()

if core.app.is_started:
background_tasks.create(self.loop(), name=f'outbox loop {client.id}')
else:
core.app.on_startup(self.loop)

@property
def message_count(self) -> int:
"""Total number of messages sent."""
return self._message_count

def _set_enqueue_event(self) -> None:
"""Set the enqueue event while accounting for lazy initialization."""
if self._enqueue_event:
Expand All @@ -52,8 +67,40 @@ def enqueue_message(self, message_type: MessageType, data: Any, target_id: Clien
self.messages.append((target_id, message_type, data))
self._set_enqueue_event()

def _append_history(self, message: Message) -> None:
now = time.time()
assert self._history is not None
self._history.append((self._message_count, now, message))
while self._history and self._history[0][1] < now - self._history_duration:
self._history.popleft()

async def synchronize(self, last_message_id: int, socket_ids: List[str]) -> None:
"""Synchronize the state of a connecting client by resending missed messages, if possible."""
messages = []
success = True
if self._history is not None:
if self._history:
next_id = last_message_id + 1
oldest_id = self._history[0][0]
if oldest_id <= next_id:
start = next_id - oldest_id
for i in range(start, len(self._history)):
msg = self._history[i][2]
if msg[0] == self.client.id or msg[0] in socket_ids:
messages.append(msg)
else:
success = False

elif last_message_id != self._message_count:
success = False

await self._emit('sync', {'success': success, 'target': socket_ids[-1], 'history': messages}, self.client.id)

async def loop(self) -> None:
"""Send updates and messages to all clients in an endless loop."""
if core.app.config.message_history_length:
self._history = Deque(maxlen=core.app.config.message_history_length)

self._enqueue_event = asyncio.Event()
self._enqueue_event.set()

Expand Down Expand Up @@ -96,9 +143,17 @@ async def loop(self) -> None:
await asyncio.sleep(0.1)

async def _emit(self, message_type: MessageType, data: Any, target_id: ClientId) -> None:
await core.sio.emit(message_type, data, room=target_id)
if message_type != 'sync':
self._message_count += 1
message = {'message_id': self._message_count, 'payload': data}
if self._history is not None:
self._append_history((target_id, message_type, message))
else:
message = data

await core.sio.emit(message_type, message, room=target_id)
if core.air is not None and core.air.is_air_target(target_id):
await core.air.emit(message_type, data, room=target_id)
await core.air.emit(message_type, message, room=target_id)

def stop(self) -> None:
"""Stop the outbox loop."""
Expand Down
6 changes: 5 additions & 1 deletion nicegui/page.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __init__(self,
:param dark: whether to use Quasar's dark mode (defaults to `dark` argument of `run` command)
:param language: language of the page (defaults to `language` argument of `run` command)
:param response_timeout: maximum time for the decorated function to build the page (default: 3.0 seconds)
:param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 0.0 seconds)
:param reconnect_timeout: maximum time the server waits for the browser to reconnect (defaults to `reconnect_timeout` argument of `run` command))
:param api_router: APIRouter instance to use, can be left `None` to use the default
:param kwargs: additional keyword arguments passed to FastAPI's @app.get method
"""
Expand Down Expand Up @@ -89,6 +89,10 @@ def resolve_language(self) -> Optional[str]:
"""Return the language of the page."""
return self.language if self.language is not ... else core.app.config.language

def resolve_reconnect_timeout(self) -> float:
"""Return the reconnect_timeout of the page."""
return self.reconnect_timeout if self.reconnect_timeout is not None else core.app.config.reconnect_timeout

def __call__(self, func: Callable[..., Any]) -> Callable[..., Any]:
core.app.remove_route(self.path) # NOTE make sure only the latest route definition is used
parameters_of_decorated_func = list(inspect.signature(func).parameters.keys())
Expand Down
37 changes: 36 additions & 1 deletion nicegui/static/nicegui.js
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ function createApp(elements, options) {
window.clientId = options.query.client_id;
const url = window.location.protocol === "https:" ? "wss://" : "ws://" + window.location.host;
window.path_prefix = options.prefix;
window.lastMessageId = options.query.starting_message_id;
window.syncing = true;
window.socketIds = [];
window.socket = io(url, {
path: `${options.prefix}/_nicegui_ws/socket.io`,
query: options.query,
Expand All @@ -315,7 +318,14 @@ function createApp(elements, options) {
tabId = createRandomUUID();
sessionStorage.setItem("__nicegui_tab_id", tabId);
}
window.socket.emit("handshake", { client_id: window.clientId, tab_id: tabId }, (ok) => {
window.socketIds.push(window.socket.id);
const args = {
client_id: window.clientId,
tab_id: tabId,
last_message_id: window.lastMessageId,
socket_ids: window.socketIds,
};
window.socket.emit("handshake", args, (ok) => {
if (!ok) {
console.log("reloading because handshake failed for clientId " + window.clientId);
window.location.reload();
Expand All @@ -337,6 +347,7 @@ function createApp(elements, options) {
},
disconnect: () => {
document.getElementById("popup").ariaHidden = false;
window.syncing = true;
},
update: async (msg) => {
for (const [id, element] of Object.entries(msg)) {
Expand All @@ -359,11 +370,35 @@ function createApp(elements, options) {
},
download: (msg) => download(msg.src, msg.filename, msg.media_type, options.prefix),
notify: (msg) => Quasar.Notify.create(msg),
sync: (msg) => {
if (msg.target !== window.socket.id) return;
if (!msg.success) {
console.log("Could not synchronize with the server. Reloading...");
window.location.reload();
return;
}
window.syncing = false;
console.log(msg.history);
for (let [_, messageType, data] of msg.history) {
if (data.message_id <= window.lastMessageId) continue;
window.lastMessageId = data.message_id;
messageHandlers[messageType](data.payload);
}
window.socketIds = window.socketIds.slice(-1);
},
};
const socketMessageQueue = [];
let isProcessingSocketMessage = false;
for (const [event, handler] of Object.entries(messageHandlers)) {
window.socket.on(event, async (...args) => {
const data = args[0];
if (data && data.hasOwnProperty("message_id")) {
if (window.syncing || data.message_id <= window.lastMessageId) {
return;
}
window.lastMessageId = data.message_id;
args[0] = data.payload;
}
socketMessageQueue.push(() => handler(...args));
if (!isProcessingSocketMessage) {
while (socketMessageQueue.length > 0) {
Expand Down
3 changes: 3 additions & 0 deletions nicegui/ui_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def run(*,
language: Language = 'en-US',
binding_refresh_interval: float = 0.1,
reconnect_timeout: float = 3.0,
message_history_length: int = 1000,
show: bool = True,
on_air: Optional[Union[str, Literal[True]]] = None,
native: bool = False,
Expand Down Expand Up @@ -63,6 +64,7 @@ def run(*,
:param language: language for Quasar elements (default: `'en-US'`)
:param binding_refresh_interval: time between binding updates (default: `0.1` seconds, bigger is more CPU friendly)
:param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 3.0 seconds)
:param message_history_length: maximum number of messages that will be stored and resent after a connection interruption (default: 1000, use 0 to disable)
:param show: automatically open the UI in a browser tab (default: `True`)
:param on_air: tech preview: `allows temporary remote access <https://nicegui.io/documentation/section_configuration_deployment#nicegui_on_air>`_ if set to `True` (default: disabled)
:param native: open the UI in a native window of size 800x600 (default: `False`, deactivates `show`, automatically finds an open port)
Expand Down Expand Up @@ -90,6 +92,7 @@ def run(*,
language=language,
binding_refresh_interval=binding_refresh_interval,
reconnect_timeout=reconnect_timeout,
message_history_length=message_history_length,
tailwind=tailwind,
prod_js=prod_js,
show_welcome_message=show_welcome_message,
Expand Down
3 changes: 3 additions & 0 deletions nicegui/ui_run_with.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def run_with(
language: Language = 'en-US',
binding_refresh_interval: float = 0.1,
reconnect_timeout: float = 3.0,
message_history_length: int = 1000,
mount_path: str = '/',
on_air: Optional[Union[str, Literal[True]]] = None,
tailwind: bool = True,
Expand All @@ -36,6 +37,7 @@ def run_with(
:param language: language for Quasar elements (default: `'en-US'`)
:param binding_refresh_interval: time between binding updates (default: `0.1` seconds, bigger is more CPU friendly)
:param reconnect_timeout: maximum time the server waits for the browser to reconnect (default: 3.0 seconds)
:param message_history_length: maximum number of messages that will be stored and resent after a connection interruption (default: 1000, use 0 to disable)
:param mount_path: mount NiceGUI at this path (default: `'/'`)
:param on_air: tech preview: `allows temporary remote access <https://nicegui.io/documentation/section_configuration_deployment#nicegui_on_air>`_ if set to `True` (default: disabled)
:param tailwind: whether to use Tailwind CSS (experimental, default: `True`)
Expand All @@ -52,6 +54,7 @@ def run_with(
language=language,
binding_refresh_interval=binding_refresh_interval,
reconnect_timeout=reconnect_timeout,
message_history_length=message_history_length,
tailwind=tailwind,
prod_js=prod_js,
show_welcome_message=show_welcome_message,
Expand Down