diff --git a/README.md b/README.md
index 6aff4ccba3..0531446118 100644
--- a/README.md
+++ b/README.md
@@ -6,7 +6,7 @@
-[](https://agent-zero.ai) [](https://github.com/sponsors/agent0ai) [](https://x.com/Agent0ai) [](https://discord.gg/B8KZKNsPpj) [](https://www.youtube.com/@AgentZeroFW) [](https://www.linkedin.com/in/jan-tomasek/) [](https://warpcast.com/agent-zero)
+[](https://agent-zero.ai) [](https://github.com/sponsors/agent0ai) [](https://x.com/Agent0ai) [](https://discord.gg/B8KZKNsPpj) [](https://www.youtube.com/@AgentZeroFW) [](https://www.linkedin.com/in/jan-tomasek/) [](https://warpcast.com/agent-zero)
## Documentation:
@@ -14,7 +14,7 @@
[Introduction](#a-personal-organic-agentic-framework-that-grows-and-learns-with-you) •
[Installation](./docs/installation.md) •
[Development](./docs/development.md) •
-[Extensibility](./docs/extensibility.md) •
+[WebSocket Infrastructure](./docs/websocket-infrastructure.md) •
[Connectivity](./docs/connectivity.md) •
[How to update](./docs/installation.md#how-to-update-agent-zero) •
[Documentation](./docs/README.md) •
@@ -158,6 +158,7 @@ docker run -p 50001:80 agent0ai/agent-zero
| [Installation](./docs/installation.md) | Installation, setup and configuration |
| [Usage](./docs/usage.md) | Basic and advanced usage |
| [Development](./docs/development.md) | Development and customization |
+| [WebSocket Infrastructure](./docs/websocket-infrastructure.md) | Real-time WebSocket handlers, client APIs, filtering semantics, envelopes |
| [Extensibility](./docs/extensibility.md) | Extending Agent Zero |
| [Connectivity](./docs/connectivity.md) | External API endpoints, MCP server connections, A2A protocol |
| [Architecture](./docs/architecture.md) | System design and components |
@@ -265,7 +266,7 @@ docker run -p 50001:80 agent0ai/agent-zero
- More space efficient on mobile
- Streamable HTTP MCP servers support
- LLM API URL added to models config for Azure, local and custom providers
-
+
### v0.9.0 - Agent roles, backup/restore
[Release video](https://www.youtube.com/watch?v=rMIe-TC6H-k)
diff --git a/agent.py b/agent.py
index c5c79a55d2..f7aa2f245d 100644
--- a/agent.py
+++ b/agent.py
@@ -1,4 +1,4 @@
-import asyncio, random, string
+import asyncio, random, string, threading
import nest_asyncio
nest_asyncio.apply()
@@ -46,6 +46,7 @@ class AgentContextType(Enum):
class AgentContext:
_contexts: dict[str, "AgentContext"] = {}
+ _contexts_lock = threading.RLock()
_counter: int = 0
_notification_manager = None
@@ -67,10 +68,14 @@ def __init__(
):
# initialize context
self.id = id or AgentContext.generate_id()
- existing = self._contexts.get(self.id, None)
- if existing:
- AgentContext.remove(self.id)
- self._contexts[self.id] = self
+ existing = None
+ with AgentContext._contexts_lock:
+ existing = AgentContext._contexts.get(self.id, None)
+ if existing:
+ AgentContext._contexts.pop(self.id, None)
+ AgentContext._contexts[self.id] = self
+ if existing and existing.task:
+ existing.task.kill()
if set_current:
AgentContext.set_current(self.id)
@@ -95,7 +100,8 @@ def __init__(
@staticmethod
def get(id: str):
- return AgentContext._contexts.get(id, None)
+ with AgentContext._contexts_lock:
+ return AgentContext._contexts.get(id, None)
@staticmethod
def use(id: str):
@@ -119,13 +125,15 @@ def set_current(ctxid: str):
@staticmethod
def first():
- if not AgentContext._contexts:
- return None
- return list(AgentContext._contexts.values())[0]
+ with AgentContext._contexts_lock:
+ if not AgentContext._contexts:
+ return None
+ return list(AgentContext._contexts.values())[0]
@staticmethod
def all():
- return list(AgentContext._contexts.values())
+ with AgentContext._contexts_lock:
+ return list(AgentContext._contexts.values())
@staticmethod
def generate_id():
@@ -134,8 +142,9 @@ def generate_short_id():
while True:
short_id = generate_short_id()
- if short_id not in AgentContext._contexts:
- return short_id
+ with AgentContext._contexts_lock:
+ if short_id not in AgentContext._contexts:
+ return short_id
@classmethod
def get_notification_manager(cls):
@@ -147,7 +156,8 @@ def get_notification_manager(cls):
@staticmethod
def remove(id: str):
- context = AgentContext._contexts.pop(id, None)
+ with AgentContext._contexts_lock:
+ context = AgentContext._contexts.pop(id, None)
if context and context.task:
context.task.kill()
return context
diff --git a/docs/README.md b/docs/README.md
index b522ed0c61..70854e542e 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -6,6 +6,7 @@ To begin with Agent Zero, follow the links below for detailed guides on various
- **[Usage Guide](usage.md):** Explore GUI features and usage scenarios.
- **[Development](development.md):** Set up a development environment for Agent Zero.
- **[Extensibility](extensibility.md):** Learn how to create custom extensions for Agent Zero.
+- **[WebSocket Infrastructure](websocket-infrastructure.md):** Build real-time features with bidirectional handlers and client APIs.
- **[Connectivity](connectivity.md):** Learn how to connect to Agent Zero from other applications.
- **[Architecture Overview](architecture.md):** Understand the internal workings of the framework.
- **[Contributing](contribution.md):** Learn how to contribute to the Agent Zero project.
@@ -58,7 +59,8 @@ To begin with Agent Zero, follow the links below for detailed guides on various
- [Knowledge](architecture.md#5-knowledge)
- [Instruments](architecture.md#6-instruments)
- [Extensions](architecture.md#7-extensions)
- - [Contributing](contribution.md)
+- [WebSocket Infrastructure](websocket-infrastructure.md)
+- [Development](development.md)
- [Getting Started](contribution.md#getting-started)
- [Making Changes](contribution.md#making-changes)
- [Submitting a Pull Request](contribution.md#submitting-a-pull-request)
diff --git a/docs/contribution.md b/docs/contribution.md
index 498577eb3e..3ee4d73bc2 100644
--- a/docs/contribution.md
+++ b/docs/contribution.md
@@ -6,6 +6,7 @@ Contributions to improve Agent Zero are very welcome! This guide outlines how t
- See [development](development.md) for instructions on how to set up a development environment.
- See [extensibility](extensibility.md) for instructions on how to create custom extensions.
+- See [websocket infrastructure](websocket-infrastructure.md) for guidance on building real-time handlers and client integrations.
1. **Fork the Repository:** Fork the Agent Zero repository on GitHub.
2. **Clone Your Fork:** Clone your forked repository to your local machine.
@@ -27,4 +28,4 @@ Contributions to improve Agent Zero are very welcome! This guide outlines how t
## Documentation Stack
-- The documentation is built using Markdown. We appreciate your contributions even if you don't know Markdown, and look forward to improve Agent Zero for everyone's benefit.
\ No newline at end of file
+- The documentation is built using Markdown. We appreciate your contributions even if you don't know Markdown, and look forward to improve Agent Zero for everyone's benefit.
diff --git a/docs/development.md b/docs/development.md
index 9faa1805bd..93344c0e16 100644
--- a/docs/development.md
+++ b/docs/development.md
@@ -68,7 +68,7 @@ Now when you select one of the python files in the project, you should see prope
```bash
pip install -r requirements.txt
playwright install chromium
-```
+```
These will install all the python packages and browser binaries for playwright (browser agent).
Errors in the code editor caused by missing packages should now be gone. If not, try reloading the window.
@@ -81,7 +81,9 @@ It will not be able to do code execution and few other features requiring the Do

-The framework will run at the default port 5000. If you open `http://localhost:5000` in your browser and see `ERR_EMPTY_RESPONSE`, don't panic, you may need to select another port like I did for some reason. If you need to change the defaut port, you can add `"--port=5555"` to the args in the `.vscode/launch.json` file or you can create a `.env` file in the root directory and set the `WEB_UI_PORT` variable to the desired port.
+The framework will run at the default port 5000. If you open `http://localhost:5000` in your browser and see `ERR_EMPTY_RESPONSE`, don't panic, you may need to select another port like I did for some reason. If you need to change the default port, you can add `"--port=5555"` to the args in the `.vscode/launch.json` file or you can create a `.env` file in the root directory and set the `WEB_UI_PORT` variable to the desired port.
+
+You can also set the bind host via `"--host=0.0.0.0"` (or `WEB_UI_HOST=0.0.0.0`).
It may take a while the first time. You should see output like the screenshot below. The RFC error is ok for now as we did not yet connect our local development to another instance in docker.

@@ -147,6 +149,7 @@ You're now ready to contribute to Agent Zero, create custom extensions, or modif
## Next steps
- See [extensibility](extensibility.md) for instructions on how to create custom extensions.
+- See [websocket infrastructure](websocket-infrastructure.md) for real-time handler patterns, client APIs, and troubleshooting tips.
- See [contribution](contribution.md) for instructions on how to contribute to the framework.
## Configuration via Environment Variables
@@ -167,4 +170,4 @@ These environment variables automatically override the hardcoded defaults in `ge
- You can use the `DockerfileLocal` to build your docker image.
- Navigate to your project root in the terminal and run `docker build -f DockerfileLocal -t agent-zero-local --build-arg CACHE_DATE=$(date +%Y-%m-%d:%H:%M:%S) .`
- The `CACHE_DATE` argument is optional, it is used to cache most of the build process and only rebuild the last steps when the files or dependencies change.
-- See `docker/run/build.txt` for more build command examples.
\ No newline at end of file
+- See `docker/run/build.txt` for more build command examples.
diff --git a/docs/installation.md b/docs/installation.md
index c611b1b798..1611250304 100644
--- a/docs/installation.md
+++ b/docs/installation.md
@@ -10,7 +10,7 @@ The following user guide provides instructions for installing and running Agent
## Windows, macOS and Linux Setup Guide
-1. **Install Docker Desktop:**
+1. **Install Docker Desktop:**
- Docker Desktop provides the runtime environment for Agent Zero, ensuring consistent behavior and security across platforms
- The entire framework runs within a Docker container, providing isolation and easy deployment
- Available as a user-friendly GUI application for all major operating systems
@@ -23,8 +23,8 @@ The following user guide provides instructions for installing and running Agent
> [!NOTE]
-> **Linux Users:** You can install either Docker Desktop or docker-ce (Community Edition).
-> For Docker Desktop, follow the instructions for your specific Linux distribution [here](https://docs.docker.com/desktop/install/linux-install/).
+> **Linux Users:** You can install either Docker Desktop or docker-ce (Community Edition).
+> For Docker Desktop, follow the instructions for your specific Linux distribution [here](https://docs.docker.com/desktop/install/linux-install/).
> For docker-ce, follow the instructions [here](https://docs.docker.com/engine/install/).
>
> If you're using docker-ce, you'll need to add your user to the `docker` group:
@@ -44,14 +44,14 @@ The following user guide provides instructions for installing and running Agent
-1.4. Once installed, launch Docker Desktop:
+1.4. Once installed, launch Docker Desktop:
> [!NOTE]
-> **MacOS Configuration:** In Docker Desktop's preferences (Docker menu) → Settings →
+> **MacOS Configuration:** In Docker Desktop's preferences (Docker menu) → Settings →
> Advanced, enable "Allow the default Docker socket to be used (requires password)."

@@ -189,8 +189,8 @@ Optionally you can map local folders for file persistence:
> You can also access the Web UI by clicking the ports right under the container ID in Docker Desktop.
> [!NOTE]
-> After starting the container, you'll find all Agent Zero files in your chosen
-> directory. You can access and edit these files directly on your machine, and
+> After starting the container, you'll find all Agent Zero files in your chosen
+> directory. You can access and edit these files directly on your machine, and
> the changes will be immediately reflected in the running container.
3. Configure Agent Zero
@@ -306,7 +306,7 @@ ollama pull
2. A CLI message should confirm the model download on your system
#### Selecting your model within Agent Zero
-1. Once you've downloaded your model(s), you must select it in the Settings page of the GUI.
+1. Once you've downloaded your model(s), you must select it in the Settings page of the GUI.
2. Within the Chat model, Utility model, or Embedding model section, choose Ollama as provider.
@@ -321,7 +321,7 @@ ollama pull
#### Managing your downloaded models
Once you've downloaded some models, you might want to check which ones you have available or remove any you no longer need.
-- **Listing downloaded models:**
+- **Listing downloaded models:**
To see a list of all the models you've downloaded, use the command:
```
ollama list
@@ -356,8 +356,10 @@ Agent Zero's Web UI is accessible from any device on your network through the Do
> - The port is automatically assigned by Docker unless you specify one
> [!NOTE]
-> If you're running Agent Zero directly on your system (legacy approach) instead of
-> using Docker, you'll need to configure the host manually in `run_ui.py` to run on all interfaces using `host="0.0.0.0"`.
+> If you're running Agent Zero directly on your system (legacy approach) instead of
+> using Docker, configure the bind address/ports via flags or environment variables:
+> - Use `--host 0.0.0.0` (or set `WEB_UI_HOST=0.0.0.0` in `.env`) to listen on all interfaces.
+> - Use `--port ` (or `WEB_UI_PORT`) to pick the HTTP port.
For developers or users who need to run Agent Zero directly on their system,see the [In-Depth Guide for Full Binaries Installation](#in-depth-guide-for-full-binaries-installation).
@@ -418,9 +420,8 @@ For developers or users who need to run Agent Zero directly on their system,see
> docker run -p $PORT:80 -v /path/to/your/data:/a0 agent0ai/agent-zero
> ```
-
+
### Conclusion
-After following the instructions for your specific operating system, you should have Agent Zero successfully installed and running. You can now start exploring the framework's capabilities and experimenting with creating your own intelligent agents.
+After following the instructions for your specific operating system, you should have Agent Zero successfully installed and running. You can now start exploring the framework's capabilities and experimenting with creating your own intelligent agents.
If you encounter any issues during the installation process, please consult the [Troubleshooting section](troubleshooting.md) of this documentation or refer to the Agent Zero [Skool](https://www.skool.com/agent-zero) or [Discord](https://discord.gg/B8KZKNsPpj) community for assistance.
-
diff --git a/docs/quickstart.md b/docs/quickstart.md
index 437cc9b65d..91b4cb0e67 100644
--- a/docs/quickstart.md
+++ b/docs/quickstart.md
@@ -4,22 +4,25 @@ This guide provides a quick introduction to using Agent Zero. We'll cover launch
## Launching the Web UI
1. Make sure you have Agent Zero installed and your environment set up correctly (refer to the [Installation guide](installation.md) if needed).
2. Open a terminal in the Agent Zero directory and activate your conda environment (if you're using one).
-3. Run the following command:
+3. Run one of the following commands:
```bash
python run_ui.py
```
-4. A message similar to this will appear in your terminal, indicating the Web UI is running:
+Notes:
+- HTTP binds to `--host/--port` (or `WEB_UI_HOST`/`WEB_UI_PORT`, default port 5000).
+
+4. A message similar to this will appear in your terminal, indicating the Web UI is running:

-5. Open your web browser and navigate to the URL shown in the terminal (usually `http://127.0.0.1:50001`). You should see the Agent Zero Web UI.
+5. Open your web browser and navigate to the URL shown in the terminal (usually `http://127.0.0.1:5000`). You should see the Agent Zero Web UI.

> [!TIP]
-> As you can see, the Web UI has four distinct buttons for easy chat management:
+> As you can see, the Web UI has four distinct buttons for easy chat management:
> `New Chat`, `Reset Chat`, `Save Chat`, and `Load Chat`.
> Chats can be saved and loaded individually in `json` format and are stored in the
> `/tmp/chats` directory.
@@ -49,6 +52,6 @@ Now that you've run a simple task, you can experiment with more complex requests
* Create or modify files
> [!TIP]
-> The [Usage Guide](usage.md) provides more in-depth information on using Agent
-> Zero's various features, including prompt engineering, tool usage, and multi-agent
-> cooperation.
\ No newline at end of file
+> The [Usage Guide](usage.md) provides more in-depth information on using Agent
+> Zero's various features, including prompt engineering, tool usage, and multi-agent
+> cooperation.
diff --git a/docs/usage.md b/docs/usage.md
index 045d9095a8..1e1f16c868 100644
--- a/docs/usage.md
+++ b/docs/usage.md
@@ -102,6 +102,9 @@ Agent Zero's power comes from its ability to use [tools](architecture.md#tools).
- **Understand Tools:** Agent Zero includes default tools like knowledge (powered by SearXNG), code execution, and communication. Understand the capabilities of these tools and how to invoke them.
+### Real-Time WebSocket Features
+- Use WebSockets when you need bidirectional, low-latency updates. The [WebSocket Infrastructure guide](websocket-infrastructure.md) explains the backend handler framework, client API, filtering semantics, and common producer/consumer patterns.
+
## Example of Tools Usage: Web Search and Code Execution
Let's say you want Agent Zero to perform some financial analysis tasks. Here's a possible prompt:
diff --git a/docs/websocket-infrastructure.md b/docs/websocket-infrastructure.md
new file mode 100644
index 0000000000..24a11ae95b
--- /dev/null
+++ b/docs/websocket-infrastructure.md
@@ -0,0 +1,731 @@
+# WebSocket Infrastructure Guide
+
+**Audience**: Backend and frontend developers building real-time features on Agent Zero
+**Updated**: 2026-01-02
+**Related Specs**: `specs/003-websocket-event-handlers/*`
+
+This guide consolidates everything you need to design, implement, and troubleshoot Agent Zero WebSocket flows. It complements the feature specification by describing day-to-day developer tasks, showing how backend handlers and frontend clients cooperate, and documenting practical patterns for producers and consumers on both sides of the connection.
+
+---
+
+## Table of Contents
+
+1. [Architecture at a Glance](#architecture-at-a-glance)
+2. [Terminology & Metadata](#terminology--metadata)
+3. [Connection Lifecycle](#connection-lifecycle)
+4. [Backend Cookbook (Handlers & Manager)](#backend-cookbook-handlers--manager)
+5. [Frontend Cookbook (websocket.js)](#frontend-cookbook-websocketjs)
+6. [Producer & Consumer Patterns](#producer--consumer-patterns)
+7. [Metadata Flow & Envelopes](#metadata-flow--envelopes)
+8. [Diagnostics, Harness & Logging](#diagnostics-harness--logging)
+9. [Best Practices Checklist](#best-practices-checklist)
+10. [Quick Reference Tables](#quick-reference-tables)
+11. [Further Reading](#further-reading)
+
+---
+
+## Architecture at a Glance
+
+- **Runtime (`run_ui.py`)** – boots `python-socketio.AsyncServer` inside an ASGI stack served by Uvicorn. Flask routes are mounted via `uvicorn.middleware.wsgi.WSGIMiddleware`, and Flask + Socket.IO share the same process so session cookies and CSRF semantics stay aligned.
+- **Singleton handlers** – every `WebSocketHandler` subclass exposes `get_instance()` and is registered exactly once. Direct instantiation raises `SingletonInstantiationError`, keeping shared state and lifecycle hooks deterministic.
+- **Dispatcher offload** – handler entrypoints (`process_event`, `on_connect`, `on_disconnect`) run in a background worker loop (via `DeferredTask`) so blocking handlers cannot stall the Socket.IO transport. Socket.IO emits/disconnects are marshalled back to the dispatcher loop. Diagnostic timing and payload summaries are only built when Event Console watchers are subscribed (development mode).
+- **`python/helpers/websocket_manager.py`** – orchestrates routing, buffering, aggregation, metadata envelopes, and session tracking. Think of it as the “switchboard” for every WebSocket event.
+- **`python/helpers/websocket.py`** – base class for application handlers. Provides lifecycle hooks, helper methods (`emit_to`, `broadcast`, `request`, `request_all`) and identifier metadata.
+- **`webui/js/websocket.js`** – frontend singleton exposing a minimal client API (`emit`, `request`, `on`, `off`) with lazy connection management and development-only logging (no client-side `broadcast()` or `requestAll()` helpers).
+- **Developer Harness (`webui/components/settings/developer/websocket-test-store.js`)** – manual and automatic validation suite for emit/request flows, timeout behaviour (including the default unlimited wait), correlation ID propagation, envelope metadata, subscription persistence across reconnect, and development-mode diagnostics.
+- **Specs & Contracts** – canonical definitions live under `specs/003-websocket-event-handlers/`. This guide references those documents but focuses on applied usage.
+
+---
+
+## Terminology & Metadata
+
+| Term | Where it Appears | Meaning |
+|------|------------------|---------|
+| `sid` | Socket.IO | Connection identifier for a Socket.IO namespace connection. With only the root namespace (`/`), each tab has one `sid`. When connecting to multiple namespaces, a tab has one `sid` per namespace. Treat connection identity as `(namespace, sid)`. |
+| `handlerId` | Manager Envelope | Fully-qualified Python class name (e.g., `python.websocket_handlers.notifications.NotificationHandler`). Used for result aggregation and logging. |
+| `eventId` | Manager Envelope | UUIDv4 generated for every server→client delivery. Unique per emission. Useful when correlating broadcast fan-out or diagnosing duplicates. |
+| `correlationId` | Bidirectional flows | Thread that ties together request, response, and any follow-up events. Client may supply one; otherwise the manager generates and echoes it everywhere. |
+| `data` | Envelope payload | Application payload you define. Always a JSON-serialisable object. |
+| `user_to_sids` / `sid_to_user` | Manager session tracking | Single-user map today (`allUsers` bucket). Future-proof for multi-tenant routing but already handy when you need all active SIDs. |
+| Buffer | Manager | Up to 100 fire-and-forget events stored per temporarily disconnected SID (expires after 1 hour). Request/response events never buffer—clients receive standardised errors instead. |
+
+Useful mental model: **client ↔ manager ↔ handler**. The manager normalises metadata and enforces routing; handlers focus on business logic; the frontend uses the same identifiers, so logs are easy to stitch.
+
+---
+
+## Connection Lifecycle
+
+1. **Lazy Connect** – `/js/websocket.js` connects only when a consumer uses the client API (e.g., `emit`, `request`, `on`). Consumers may still explicitly `await websocket.connect()` to block UI until the socket is ready.
+2. **Handshake** – Socket.IO connects using the existing Flask session cookie and a CSRF token provided via the Socket.IO `auth` payload (`csrf_token`). The token is obtained from `GET /csrf_token` (see `/js/api.js#getCsrfToken()`), which also sets the runtime-scoped cookie `csrf_token_{runtime_id}`. The server validates an **Origin allowlist** (RFC 6455 / OWASP CSWSH baseline) and then checks handler requirements (`requires_auth`, `requires_csrf`) before accepting.
+3. **Lifecycle Hooks** – After acceptance, `WebSocketHandler.on_connect(sid)` fires for every registered handler. Use it for initial emits, state bookkeeping, or session tracking.
+4. **Normal Operation** – Client emits events. Manager routes them to the appropriate handlers, gathers results, and wraps outbound deliveries in the mandatory envelope.
+5. **Disconnection & Buffering** – If a tab goes away without a graceful disconnect, fire-and-forget events accumulate (max 100). On reconnect, the manager flushes the buffer via `emit_to`. Request flows respond with explicit `CONNECTION_NOT_FOUND` errors.
+6. **Reconnection Attempts** – Socket.IO handles reconnect attempts; the manager continues to buffer fire-and-forget events (up to 1 hour) for temporarily disconnected SIDs and flushes them on reconnect.
+
+### State Sync (Replacing `/poll`)
+
+Agent Zero can also push poll-shaped state snapshots over the WebSocket bus, replacing the legacy 4Hz `/poll` loop while preserving the existing UI update contract.
+
+- **Handshake**: the frontend sync store (`/components/sync/sync-store.js`) calls `websocket.request("state_request", { context, log_from, notifications_from, timezone })` to establish per-tab cursors and a `seq_base`.
+- **Push**: the server emits `state_push` events containing `{ runtime_epoch, seq, snapshot }`, where `snapshot` is exactly the `/poll` payload shape built by `python/helpers/state_snapshot.py`.
+- **Coalescing**: the backend `StateMonitor` coalesces dirties per SID (25ms window) so streaming updates stay smooth without unbounded trailing-edge debounce.
+- **Degraded fallback**: if the WebSocket handshake/push path is unhealthy, the UI enters `DEGRADED` and uses `/poll` as a fallback; while degraded, push snapshots are ignored to avoid racey double-writes.
+
+### Thinking in Roles
+
+- **Client** (frontend) is the page that imports `/js/websocket.js`. It acts as both a **producer** (calling `emit`, `request`) and a **consumer** (subscribing with `on`).
+- **Manager** (`WebSocketManager`) sits server-side and routes everything. It resolves correlation IDs, wraps envelopes, and fans out results.
+- **Handler** (`WebSocketHandler`) executes the application logic. Each handler may emit additional events back to the client or initiate its own requests to connected SIDs.
+
+### Flow Overview (by Operation)
+
+```
+Client emit() ───▶ Manager route_event() ───▶ Handler.process_event()
+ │ │ └──(fire-and-forget, no ack)
+ └── throws if └── validates payload + routes by namespace/event type
+ not connected updates last_activity
+
+Client request() ─▶ Manager route_event() ─▶ Handlers (async gather)
+ │ │ └── per-handler dict/None
+ │ │
+ │ └── builds {correlationId, results[]}
+ └── Promise resolves with aggregated results (timeouts become error items)
+
+Server emit_to() ──▶ Manager.emit_to() ──▶ Socket.IO delivery/buffer
+ │ │ └── envelope {handlerId,…}
+ └── raises ConnectionNotFoundError for unknown sid (never seen)
+
+Server broadcast() ─▶ Manager.broadcast()
+ │ └── iterates active sids (respecting exclude_sids)
+ │ └── delegates to `Manager.emit_to()` → `socketio.emit(..., to=sid)`
+ └── fire-and-forget (no ack)
+
+Server request() ─▶ Manager.request_for_sid() ─▶ route_event()
+ │ │ └── per-handler responses
+ └── Await aggregated {correlationId, results[]}
+
+Server request_all() ─▶ Manager.route_event_all() ─▶ route_event per sid
+ │ │ └── per-handler results
+ └── Await list[{sid, correlationId, results[]}]
+```
+
+These diagrams highlight the “who calls what” surface while the detailed semantics (envelopes, buffering, timeouts) remain consistent with the tables later in this guide.
+
+### End-to-End Examples
+
+1. **Client request ➜ multiple handlers**
+
+ 1. Frontend calls `websocket.request("refresh_metrics", payload)`.
+ 2. Manager routes to each handler registered for that event type and awaits `asyncio.gather`.
+ 3. Each handler returns a dict (or raises); the manager wraps them in `results[]` and resolves the Promise with `{ correlationId, results }`.
+ 4. The caller inspects per-handler data or errors, filtering by `handlerId` as needed.
+
+2. **Server broadcast with buffered replay**
+
+ 1. Handler invokes `self.broadcast("notification_broadcast", data, exclude_sids=sid)`.
+ 2. Manager iterates active connections. For connected SIDs it emits immediately with the mandatory envelope. For temporarily disconnected SIDs it enqueues into the per-SID buffer (up to 100 events).
+ 3. When a buffered SID reconnects, `_flush_buffer()` replays the queued envelopes preserving `handlerId`, `eventId`, `correlationId`, and `ts`.
+
+3. **Server request_all ➜ client-side confirmations**
+
+ 1. Handler issues `await self.request_all("confirm_close", { contextId }, timeout_ms=5000)`.
+ 2. Manager fans out to every active SID, allowing `exclude_handlers` when provided.
+ 3. Each subscribed client runs its `websocket.on("confirm_close", …)` callback and returns data through the Socket.IO acknowledgement.
+ 4. The handler receives `[{ sid, correlationId, results[] }]`, inspects each response, and proceeds accordingly.
+
+These expanded flows complement the operation matrix later in the guide, ensuring every combination (client/server × emit/request and server request_all) is covered explicitly.
+
+---
+
+## Backend Cookbook (Handlers & Manager)
+
+### 1. Handler Discovery & Setup
+
+Handlers are discovered deterministically from `python/websocket_handlers/`:
+
+- **File entry**: `python/websocket_handlers/state_sync_handler.py` → namespace `/state_sync`
+- **Folder entry**: `python/websocket_handlers/orders/` or `python/websocket_handlers/orders_handler/` → namespace `/orders` (loads `*.py` one level deep; ignores `__init__.py` and deeper nesting)
+- **Reserved root**: `python/websocket_handlers/_default.py` → namespace `/` (diagnostics-only by default)
+
+Create handler modules under the appropriate namespace entry and inherit from `WebSocketHandler`.
+
+```python
+from python.helpers.websocket import WebSocketHandler
+
+class DashboardHandler(WebSocketHandler):
+ @classmethod
+ def get_event_types(cls) -> list[str]:
+ return ["dashboard_refresh", "dashboard_push"]
+
+ async def process_event(self, event_type: str, data: dict[str, Any], sid: str) -> dict | None:
+ if event_type == "dashboard_refresh":
+ stats = await self._load_stats(data.get("scope", "all"))
+ return {"ok": True, "stats": stats}
+
+ if event_type == "dashboard_push":
+ await self.broadcast(
+ "dashboard_update",
+ {"stats": data.get("stats", {}), "source": sid},
+ exclude_sids=sid,
+ )
+ return None
+```
+
+Handlers are auto-loaded on startup; duplicate event declarations produce warnings but are supported. Use `validate_event_types` to ensure names follow lowercase snake_case and avoid Socket.IO reserved events.
+
+### 2. Consuming Client Events (Server as Consumer)
+
+- Implement `process_event` and return either `None` (fire-and-forget) or a dict that becomes the handler’s contribution in `results[]`.
+- Use dependency injection (async functions, database calls, etc.) but keep event loop friendly—no blocking calls.
+- Validate input vigorously and return structured errors as needed.
+
+```python
+async def process_event(self, event_type: str, data: dict, sid: str) -> dict | None:
+ if "query" not in data:
+ return {"ok": False, "error": {"code": "VALIDATION", "error": "Missing query"}}
+
+ rows = await self.search_backend(data["query"], limit=data.get("limit", 25))
+ return {"ok": True, "data": rows, "count": len(rows)}
+```
+
+### 3. Producing Server Events (Server as Producer)
+
+Four helper methods mirror the frontend API. The table below summarises them (full table in [Quick Reference](#quick-reference-tables)).
+
+| Method | Target | Ack | Filters | Typical Use |
+|--------|--------|-----|---------|--------------|
+| `emit_to(sid, event, data, correlation_id=None)` | Single SID | No | None | Push job progress, reply to a request without using Socket.IO ack (already produced). |
+| `broadcast(event, data, exclude_sids=None, correlation_id=None)` | All SIDs | No | `exclude_sids` only | Fan-out notifications, multi-tab sync while skipping the caller. |
+| `request(sid, event, data, timeout_ms=0)` | Single SID | Yes (`results[]`) | None | Ask the client to run local logic (e.g., UI confirmation) and gather per-handler results. |
+| `request_all(event, data, timeout_ms=0)` | All SIDs | Yes (`[{sid, results[]}]`) | None | Fan-out to every tab, e.g., “refresh your panel” or “confirm unsaved changes”. |
+
+Each helper automatically injects `handlerId`, obeys metadata envelopes, enforces routing rules, and handles timeouts:
+
+```python
+aggregated = await self.request_all(
+ "workspace_ping",
+ {"payload": {"reason": "health_check"}},
+ timeout_ms=2_000,
+)
+
+for entry in aggregated:
+ self.log.info("sid %s replied: %s", entry["sid"], entry["results"])
+```
+
+Timeouts convert into `{ "ok": False, "error": {"code": "TIMEOUT", ...} }`; they do **not** raise.
+
+### 4. Multi-Handler Aggregation
+
+- When multiple handlers subscribe to the same event, the manager invokes them concurrently with `asyncio.gather`. Aggregated results preserve registration order. Use correlation IDs to map responses to original triggers.
+- Client-side handler include/exclude filters are intentionally not supported. Consumers filter `results[]` by `handlerId` when needed.
+
+```python
+if not results:
+ return {
+ "handlerId": self.identifier,
+ "ok": False,
+ "error": {"code": "NO_HANDLERS", "error": "No handler registered for this event type"},
+ }
+```
+
+### 5. Session Tracking Helpers
+
+`WebSocketManager` maintains lightweight mappings that you can use from handlers:
+
+```python
+all_sids = self.manager.get_sids_for_user() # today: every active sid
+maybe_user = self.manager.get_user_for_sid(sid) # currently None or "single_user"
+
+if updated_payload:
+ await asyncio.gather(
+ *[
+ self.emit_to(other_sid, "dashboard_update", updated_payload)
+ for other_sid in all_sids if other_sid != sid
+ ]
+ )
+```
+
+These helpers are future-proof for multi-tenant evolution and already handy to broadcast to every tab except the caller.
+
+**Future Multitenancy Mechanics**
+- **Registration**: When multi-user support ships, `handle_connect` will resolve the authenticated user identifier (e.g., from Flask session). `register()` will stash that identifier alongside the SID and place it into `user_to_sids[user_id]` while still populating the `allUsers` bucket for backward compatibility.
+- **Lookups**: `get_sids_for_user(user_id)` will return the tenant-specific SID set. Omitting the argument (or passing `None`) keeps today’s behaviour and yields the full `allUsers` list. `get_user_for_sid(sid)` will expose whichever identifier was recorded at registration.
+- **Utility**: These primitives unlock future features such as sending workspace notifications to every tab owned by the same account, ejecting all sessions for a suspended user, or correlating request/response traffic per tenant without rewriting handlers.
+- **Migration Story**: Existing handler code that loops over `get_sids_for_user()` automatically gains tenant-scoped behaviour once callers pass a `user_id`. Tests will exercise both single-user (default) and multi-tenant branches to guarantee compatibility.
+
+---
+
+## Frontend Cookbook (`websocket.js`)
+
+### 1. Connecting
+
+```javascript
+import { getNamespacedClient } from "/js/websocket.js";
+
+const websocket = getNamespacedClient("/"); // reserved root (diagnostics-only by default)
+
+// Optional: await the handshake if you need to block UI until the socket is ready
+await websocket.connect();
+
+// Runtime metadata is exposed globally for Alpine stores / harness
+console.log(window.runtimeInfo.id, window.runtimeInfo.isDevelopment);
+```
+
+- The module connects lazily when a consumer uses the client API (e.g., `emit`, `request`, `on`). Components may still explicitly `await websocket.connect()` to block rendering on readiness or re-run diagnostics.
+- The server enforces an Origin allowlist during the Socket.IO connect handshake (baseline CSWSH mitigation). The browser session cookie remains the authentication mechanism, and CSRF is validated via the Socket.IO `auth` payload (`csrf_token`) plus the runtime-scoped CSRF cookie and session value.
+- Socket.IO handles reconnection attempts automatically.
+
+### Namespaces (end-state)
+
+- The root namespace (`/`) is reserved and intentionally unhandled by default for application events. Feature code should connect to an explicit namespace (for example `/state_sync`).
+- The frontend exposes `createNamespacedClient(namespace)` and `getNamespacedClient(namespace)` (one client instance per namespace per tab). Namespaced clients expose the same minimal API: `emit`, `request`, `on`, `off`.
+- Unknown namespaces are rejected deterministically during the Socket.IO connect handshake with a `connect_error` payload:
+ - `err.message === "UNKNOWN_NAMESPACE"`
+ - `err.data === { code: "UNKNOWN_NAMESPACE", namespace: "/requested" }`
+
+### 2. Client Operations
+
+- **Producers (client → server)** use `emit` and `request`. Payloads must be objects; primitive payloads throw.
+- **Consumers (server → client)** register callbacks with `on(eventType, callback)` and remove them with `off()`.
+
+Example (producer):
+
+```javascript
+await websocket.request("hello_request", { name: this.name }, {
+ timeoutMs: 1500,
+ correlationId: `greet-${crypto.randomUUID()}`,
+});
+```
+
+Example (consumer):
+
+```javascript
+websocket.on("dashboard_update", (envelope) => {
+ const { handlerId, correlationId, ts, data } = envelope;
+ this.debugLog({ handlerId, correlationId, ts });
+ this.rows = data.rows;
+});
+
+// Later, during cleanup
+websocket.off("dashboard_update");
+```
+
+### 3. Envelope Awareness
+
+Subscribers always receive:
+
+```javascript
+interface ServerDeliveryEnvelope {
+ handlerId: string;
+ eventId: string;
+ correlationId: string;
+ ts: string; // ISO8601 UTC with millisecond precision
+ data: object;
+}
+```
+
+Even if existing components only look at `data`, you should record `handlerId` and `correlationId` when building new features—doing so simplifies debugging multi-tab flows.
+
+### 4. Development-Only Logging
+
+`websocket.debugLog()` writes to the console only when `runtimeInfo.isDevelopment` is true. Use it liberally when diagnosing event flows without polluting production logs.
+
+```javascript
+websocket.debugLog("request", { correlationId: payload.correlationId, timeoutMs });
+```
+
+### 5. Helper Utilities
+
+`webui/js/websocket.js` exports helper utilities alongside the `websocket` singleton so correlation metadata and envelopes stay consistent:
+
+- `createCorrelationId(prefix?: string)` returns a UUID-based identifier, optionally prefixed (e.g. `createCorrelationId('hello') → hello-1234…`). Use it when chaining UI actions to backend logs.
+- `validateServerEnvelope(envelope)` guarantees subscribers receive the canonical `{ handlerId, eventId, correlationId, ts, data }` shape; throw if the payload is malformed.
+
+Example:
+
+```javascript
+import { getNamespacedClient, createCorrelationId, validateServerEnvelope } from '/js/websocket.js';
+
+const websocket = getNamespacedClient('/state_sync');
+
+const { results } = await websocket.request(
+ 'hello_request',
+ { name: this.name },
+ { correlationId: createCorrelationId('hello') },
+);
+
+websocket.on('dashboard_update', (envelope) => {
+ const validated = validateServerEnvelope(envelope);
+ this.rows = validated.data.rows;
+});
+```
+
+### 6. Error Handling
+
+- Producer methods call `websocket.connect()` internally, so they wait for the handshake automatically. They only surface `Error("Not connected")` if the handshake ultimately fails (for example, the user is logged out or the server is down).
+- `request()` acknowledgement timeouts reject with `Error("Request timeout")`. Server-side fan-out timeouts (for example `request_all`) are represented as `results[]` entries with `error.code = "TIMEOUT"` (no Promise rejection).
+- For large payloads, the client throws before sending and the server rejects frames above the 50 MiB cap (`max_http_buffer_size` on the Socket.IO engine).
+
+### 7. Startup Broadcast
+
+- When **Broadcast server restart event** is enabled in Developer settings (on by default) the backend emits a fire-and-forget `server_restart` envelope the first time each connection is established after a process restart. The payload includes `runtimeId` and an ISO8601 timestamp so clients can reconcile cached state.
+- Disable the toggle if your deployment pipeline already publishes restart notifications.
+
+---
+
+## Frontend Error Handling (Using the Registry)
+
+Client code should treat `RequestResultItem.error.code` as one of the documented values and branch behavior accordingly. Keep UI decisions localized and reusable.
+
+Recommended patterns
+- Centralize mapping from `WsErrorCode` → user-facing message and remediation hint.
+- Always surface hard errors (timeouts); gate debug details by dev flag.
+
+Example – request()
+```javascript
+import { getNamespacedClient } from '/js/websocket.js'
+
+const websocket = getNamespacedClient('/state_sync')
+
+function renderError(code, message) {
+ // Map codes to UI copy; keep messages concise
+ switch (code) {
+ case 'NO_HANDLERS': return `No handler for this action (${message})`
+ case 'TIMEOUT': return `Request timed out; try again or increase timeout`
+ case 'CONNECTION_NOT_FOUND': return `Target connection unavailable; retry after reconnect`
+ default: return message || 'Unexpected error'
+ }
+}
+
+const res = await websocket.request('example_event', { foo: 'bar' }, { timeoutMs: 1500 })
+for (const item of res.results) {
+ if (item.ok) {
+ // use item.data
+ } else {
+ const msg = renderError(item.error?.code, item.error?.error)
+ // show toast/log based on dev flag
+ console.error('[ws]', msg)
+ }
+}
+```
+
+Subscriptions – envelope handler
+```javascript
+import { getNamespacedClient } from '/js/websocket.js'
+
+const websocket = getNamespacedClient('/state_sync')
+
+websocket.on('example_broadcast', ({ data, handlerId, eventId, correlationId }) => {
+ // handle data; errors should not typically arrive via broadcast
+ // correlationId can link UI actions to backend logs
+})
+```
+
+See also
+- Error Codes Registry (above) for the authoritative code list
+- Contracts: `frontend-api.md` for method signatures and response shapes
+
+---
+
+## Producer & Consumer Patterns
+
+### Pattern A – Fire-and-Forget Notification (Server Producer → Client Consumers)
+
+Backend:
+
+```python
+await self.broadcast(
+ "notification_broadcast",
+ {
+ "message": data["message"],
+ "level": data.get("level", "info"),
+ "timestamp": datetime.now(timezone.utc).isoformat(),
+ },
+ exclude_sids=sid,
+ correlation_id=data.get("correlationId"),
+)
+```
+
+Frontend:
+
+```javascript
+websocket.on("notification_broadcast", ({ data, correlationId, ts }) => {
+ notifications.unshift({ ...data, correlationId, ts });
+});
+```
+
+### Pattern B – Request/Response With Multi-Handler Aggregation (Client Producer → Server Consumers)
+
+Client:
+
+```javascript
+const { correlationId, results } = await websocket.request(
+ "refresh_metrics",
+ { duration: "1h" },
+ { timeoutMs: 2_000 }
+);
+
+results.forEach(({ handlerId, ok, data, error }) => {
+ if (ok) renderMetrics(handlerId, data);
+ else console.warn(handlerId, error);
+});
+```
+
+Server (two handlers listening to the same event):
+
+```python
+class TaskMetrics(WebSocketHandler):
+ @classmethod
+ def get_event_types(cls) -> list[str]:
+ return ["refresh_metrics"]
+
+ async def process_event(self, event_type: str, data: dict, sid: str) -> dict | None:
+ stats = await self._load_task_metrics(data["duration"])
+ return {"metrics": stats}
+
+class HostMetrics(WebSocketHandler):
+ @classmethod
+ def get_event_types(cls) -> list[str]:
+ return ["refresh_metrics"]
+
+ async def process_event(self, event_type: str, data: dict, sid: str) -> dict | None:
+ return {"metrics": await self._load_host_metrics(data["duration"])}
+```
+
+### Pattern C – Fan-Out `request_all` (Server Producer → Many Client Consumers)
+
+Backend (server producer asking every tab to confirm a destructive operation):
+
+```python
+confirmations = await self.request_all(
+ "confirm_close_tab",
+ {"contextId": context_id},
+ timeout_ms=5_000,
+)
+
+for entry in confirmations:
+ self.log.info("%s responded: %s", entry["sid"], entry["results"])
+```
+
+Frontend consumer matching the envelope:
+
+```javascript
+websocket.on("confirm_close_tab", async ({ data, correlationId }) => {
+ const accepted = await showModalAndAwaitUser(data.contextId);
+ return { ok: accepted, correlationId, decision: accepted ? "close" : "stay" };
+});
+```
+
+### Pattern D – Server Reply Without Using `ack`
+
+Sometimes you want to acknowledge work immediately but stream additional updates later. Combine `request()` for the initial confirmation and `emit_to()` for follow-up events using the same correlation ID.
+
+```python
+async def process_event(self, event_type: str, data: dict, sid: str) -> dict | None:
+ if event_type != "start_long_task":
+ return None
+
+ correlation_id = data.get("correlationId")
+ asyncio.create_task(self._run_workflow(sid, correlation_id))
+ return {"accepted": True, "correlationId": correlation_id}
+
+async def _run_workflow(self, sid: str, correlation_id: str | None):
+ for step in range(10):
+ await asyncio.sleep(1)
+ await self.emit_to(
+ sid,
+ "task_progress",
+ {"step": step, "total": 10},
+ correlation_id=correlation_id,
+ )
+```
+
+---
+
+## Metadata Flow & Envelopes
+
+### Client → Server Payload
+
+Producers send an object payload as `data` (never primitives). Request metadata like `timeoutMs` and `correlationId` are passed as method options, not embedded into `data`.
+
+The manager validates the payload, resolves/creates `correlationId`, and passes a clean copy of `data` to handlers.
+
+### Server → Client Envelope (mandatory)
+
+```json
+{
+ "handlerId": "python.websocket_handlers.notifications.NotificationHandler",
+ "eventId": "b7e2a9cd-2857-4f7a-8bf4-12a736cb6720",
+ "correlationId": "caller-supplied-or-generated",
+ "ts": "2025-10-31T13:13:37.123Z",
+ "data": { "message": "Hello!" }
+}
+```
+
+**Guidance:**
+
+- Use `eventId` alongside frontend logging to spot duplicate deliveries or buffered flushes.
+- `correlationId` ties together the user action that triggered the event, even if multiple handlers participate.
+- `handlerId` helps you distinguish which handler produced the payload, especially when multiple handlers share the same event type.
+
+---
+
+## Diagnostics, Harness & Logging
+
+### Developer Harness
+
+- Location: `Settings → Developer → WebSocket Test Harness`.
+- Automatic mode drives emit, request, delayed request (default unlimited timeout), subscription persistence, and envelope validation. It asserts envelope metadata (handlerId, eventId, correlationId, ISO8601 timestamps) and correlation carryover.
+- Manual buttons let you trigger individual flows and inspect recent payloads.
+- Harness hides itself when `runtime.isDevelopment` is false so production builds incur zero overhead.
+- Helper APIs (`createCorrelationId`, `validateServerEnvelope`) are exercised end to end; subscription logs record the `server_restart` broadcast emitted on first connection after a runtime restart.
+
+### WebSocket Event Console
+
+- Location: `Settings → Developer → WebSocket Event Console`.
+- Enabling capture calls `websocket.request("ws_event_console_subscribe", { requestedAt })`. The handler (`DevWebsocketTestHandler`) refuses the subscription outside development mode and registers the SID as a **diagnostic watcher** by calling `WebSocketManager.register_diagnostic_watcher`. Only connected SIDs can subscribe.
+- Disabling capture calls `websocket.request("ws_event_console_unsubscribe", {})`. Disconnecting also triggers `WebSocketManager.unregister_diagnostic_watcher`, so stranded watchers never accumulate.
+- While at least one watcher exists, the manager streams `ws_dev_console_event` envelopes (documented in `contracts/event-schemas.md`). Each payload contains:
+ - `kind`: `"inbound" | "outbound" | "lifecycle"`
+ - `eventType`, `sid`, `targets[]`, delivery/buffer flags
+ - `resultSummary` (handler counts, per-handler status, durationMs)
+ - `payloadSummary` (first few keys + byte size)
+- Lifecycle broadcasts (`ws_lifecycle_connect` / `ws_lifecycle_disconnect`) are emitted asynchronously via `broadcast(..., diagnostic=True)` so long-running handlers can’t block dispatch.
+- The modal UI exposes:
+ - Start/stop capture (explicitly controls subscription state).
+ - Resubscribe button (detach + resubscribe) to recover gracefully after Socket.IO reconnects.
+ - Clear button (resets the in-memory ring buffer).
+ - “Handled-only” toggle that filters inbound entries to ones that resolved to registered handlers or produced errors.
+- When the watcher set becomes empty the manager immediately stops streaming diagnostics, guaranteeing zero steady-state overhead outside development.
+
+### Instrumentation & Logging
+
+- `WebSocketManager` offloads handler execution via `DeferredTask` and may record `durationMs` when development diagnostics are active (Event Console watchers subscribed). These metrics flow into the Event Console stream (and may also appear in `request()` / `request_all()` results), keeping steady-state overhead near zero when diagnostics are closed.
+- Lifecycle events capture `connectionCount`, ISO8601 timestamps, and SID so dashboards can correlate UI behaviour with connection churn.
+- Backend logging: use `PrintStyle.debug/info/warning` and always include `handlerId`, `eventType`, `sid`, and `correlationId`. The manager already logs connection events, missing handlers, and buffer overflows.
+- Frontend logging: `websocket.debugLog()` mirrors backend debug messages but only when `window.runtimeInfo.isDevelopment` is true.
+
+### Access Logs & Transport Troubleshooting
+
+- Settings → Developer includes a persisted `uvicorn_access_logs_enabled` switch. When enabled, `run_ui.py` enables Uvicorn access logs so transport issues (CORS, handshake failures) can be traced.
+- The long-standing `websocket_server_restart_enabled` switch (same section) controls whether newly connected clients receive the `server_restart` broadcast that carries `runtimeId` metadata.
+
+### Common Issues
+
+1. **`CONNECTION_NOT_FOUND`** – `emit_to` called with an SID that never existed or expired long ago. Use `get_sids_for_user` before emitting or guard on connection presence.
+2. **Timeout Rejections** – `request()` and `request_all()` reject only when the transport times out, not when a handler takes too long. Inspect the returned result arrays for `TIMEOUT` entries and consider increasing `timeoutMs`.
+3. **Origin Rejected** – the Socket.IO handshake was rejected because the `Origin` header did not match the expected UI origin. Ensure you access the UI and the WebSocket endpoint on the same scheme/host/port, and verify any reverse proxy preserves the `Origin` header.
+4. **Diagnostics Subscriptions Failing** – only available in development mode and for connected SIDs. Verify the browser tab still holds an active session and that `window.runtimeInfo.isDevelopment` is true before opening the modal.
+
+---
+
+## Best Practices Checklist
+
+- [ ] Always validate inbound payloads in `process_event` (required fields, type constraints, length limits).
+- [ ] Propagate `correlationId` through multi-step workflows so logs and envelopes align.
+- [ ] Respect the 50 MB payload cap; prefer HTTP + polling for bulk data transfers.
+- [ ] Ensure long-running operations emit progress via `emit_to` or switch to an async task with periodic updates.
+- [ ] Buffer-sensitive actions (`emit_to`) should handle `ConnectionNotFoundError` from unknown SIDs gracefully.
+- [ ] When adding new handlers, update the developer harness if new scenarios need coverage.
+- [ ] Keep `PrintStyle` logs meaningful—include `handlerId`, `eventType`, `sid`, and `correlationId`.
+- [ ] In Alpine components, call `websocket.off()` during teardown to avoid duplicate subscriptions.
+
+---
+
+## Quick Reference Tables
+
+### Operation Matrix
+
+| Direction | API | Ack? | Filters | Notes |
+|-----------|-----|------|---------|-------|
+| Client → Server | `emit(event, data, { correlationId? })` | No | None | Fire-and-forget. |
+| Client → Server | `request(event, data, { timeoutMs?, correlationId? })` | Yes (`{ correlationId, results[] }`) | None | Aggregates per handler. Timeout entries appear inside `results`. |
+| Server → Client | `emit_to(sid, ...)` | No | None | Raises `ConnectionNotFoundError` for unknown `sid`. Buffers if disconnected. |
+| Server → Client | `broadcast(...)` | No | `exclude_sids` only | Iterates over current connections; uses the same envelope as `emit_to`. |
+| Server → Client | `request(...)` | Yes (`{ correlationId, results[] }`) | None | Equivalent of client `request` but targeted at one SID from the server. |
+| Server → Client | `request_all(...)` | Yes (`[{ sid, correlationId, results[] }]`) | None | Server-initiated fan-out. |
+
+### Metadata Cheat Sheet
+
+| Field | Produced By | Guarantees |
+|-------|-------------|------------|
+| `correlationId` | Manager | Present on every response/envelope. Caller-supplied ID is preserved; otherwise manager generates UUIDv4 hex. |
+| `eventId` | Manager | Unique UUIDv4 per server→client delivery. Helpful for dedup / auditing. |
+| `handlerId` | Handler / Manager | Deterministic value `module.Class`. Used for results. |
+| `ts` | Manager | ISO8601 UTC with millisecond precision. Replaces `+00:00` with `Z`. |
+| `results[]` | Manager | Array of `{ handlerId, ok, data?, error? }`. Errors include `code`, `error`, and optional `details`. |
+
+---
+
+## Further Reading
+
+- **QuickStart** – [`specs/003-websocket-event-handlers/quickstart.md`](../specs/003-websocket-event-handlers/quickstart.md) for a step-by-step introduction.
+- **Contracts** – Backend, frontend, schema, and security contracts define the canonical API surface:
+ - [`websocket-handler-interface.md`](../specs/003-websocket-event-handlers/contracts/websocket-handler-interface.md)
+ - [`frontend-api.md`](../specs/003-websocket-event-handlers/contracts/frontend-api.md)
+ - [`event-schemas.md`](../specs/003-websocket-event-handlers/contracts/event-schemas.md)
+ - [`security-contract.md`](../specs/003-websocket-event-handlers/contracts/security-contract.md)
+- **Implementation Reference** – Inspect `python/helpers/websocket_manager.py`, `python/helpers/websocket.py`, `webui/js/websocket.js`, and the developer harness in `webui/components/settings/developer/websocket-test-store.js` for concrete examples.
+
+> **Tip:** When extending the infrastructure (new metadata) start by updating the contracts, sync the manager/frontend helpers, and then document the change here so producers and consumers stay in lockstep.
+
+## Error Codes Registry (Draft for Phase 6)
+
+The WebSocket stack standardizes backend error codes returned in `RequestResultItem.error.code`. This registry documents the currently used codes and their intended meaning. Client and server implementations should reference these values verbatim (UPPER_SNAKE_CASE).
+
+| Code | Scope | Meaning | Typical Remediation | Example Payload |
+|------|-------|---------|---------------------|-----------------|
+| `NO_HANDLERS` | Manager routing | No handler is registered for the requested `eventType`. | Register a handler for the event or correct the event name. | `{ "handlerId": "WebSocketManager", "ok": false, "error": { "code": "NO_HANDLERS", "error": "No handler for 'missing'" } }` |
+| `TIMEOUT` | Aggregated or single request | The request exceeded `timeoutMs`. | Increase `timeoutMs`, reduce handler processing time, or split work. | `{ "handlerId": "ExampleHandler", "ok": false, "error": { "code": "TIMEOUT", "error": "Request timeout" } }` |
+| `CONNECTION_NOT_FOUND` | Single‑sid request | Target `sid` is not connected/known. | Use an active `sid` or retry after reconnect. | `{ "handlerId": "WebSocketManager", "ok": false, "error": { "code": "CONNECTION_NOT_FOUND", "error": "Connection 'sid-123' not found" } }` |
+| `HARNESS_UNKNOWN_EVENT` | Developer harness | Harness test handler received an unsupported event name. | Update harness sources or disable the step before running automation. | `{ "handlerId": "python.websocket_handlers.dev_websocket_test_handler.DevWebsocketTestHandler", "ok": false, "error": { "code": "HARNESS_UNKNOWN_EVENT", "error": "Unhandled event", "details": "ws_tester_foo" } }` |
+
+Notes
+- Error payload shape follows the contract documented in `contracts/event-schemas.md` (`RequestResultItem.error`).
+- Codes are case‑sensitive. Use exactly as listed.
+- Future codes will be appended here and referenced by inline docstrings/JSDoc.
+
+### Client-Side Error Codes (Draft)
+
+The frontend can originate errors during validation, connection, or request execution. Today these surface as thrown exceptions/promise rejections (not as `RequestResultItem`). When server→client request/ack lands in the future, these codes will also be serialised in `RequestResultItem.error.code` for protocol symmetry.
+
+| Code | Scope | Current Delivery | Meaning | Typical Remediation | Example |
+|------|-------|------------------|---------|---------------------|---------|
+| `VALIDATION_ERROR` | Producer options / payload | Exception (throw) | Invalid options (e.g., bad `timeoutMs`/`correlationId`) or non-object payload | Fix caller options and payload shapes | `new Error("timeoutMs must be a non-negative number")` |
+| `PAYLOAD_TOO_LARGE` | Size precheck (50MB cap) | Exception (throw) | Client precheck rejects payloads exceeding cap before emit | Reduce payload or chunk via HTTP; keep binaries off WS | `new Error("Payload size exceeds maximum (.. > .. bytes)")` |
+| `NOT_CONNECTED` | Socket status | Exception (throw) | Auto-connect could not establish a session (user logged out, server offline, handshake rejected) | Check login state, server availability, and Origin policy; optional `await websocket.connect()` for diagnostics | `new Error("Not connected")` |
+| `REQUEST_TIMEOUT` | request() | Not used (end-state) | Timeouts are represented inside `results[]` as `error.code="TIMEOUT"` (Promise resolves). | Inspect `results[]` for `TIMEOUT` items and handle in UI. | N/A |
+| `CONNECT_ERROR` | Socket connect_error | Exception (throw/log) | Transport/handshake failure | Check server availability, CORS, or network | `new Error("WebSocket connection failed: ...")` |
+
+Notes
+- These are currently local exceptions, not part of the aggregated results payload. Calling code should `try/catch` or handle promise rejections.
+- When server→client request/ack is introduced, the same codes will be serialised into `RequestResultItem.error.code` to maintain symmetry with backend codes.
+- Prefer branching on `code` when available; avoid coupling to full message strings.
+
+### IDE Hints (Non‑enforcing)
+
+To surface recognized codes without adding toolchain dependencies, front‑end can use a JSDoc union type near the helper exports:
+
+```javascript
+/** @typedef {('NO_HANDLERS'|'TIMEOUT'|'CONNECTION_NOT_FOUND')} WsErrorCode */
+```
+
+Back‑end can reference this registry via concise docstrings at error construction points (e.g., `_build_error_result`) to improve discoverability.
+
+---
+
+## Phase 6 – Registry & Helper Work Status
+
+Current status
+- This registry table is drafted and linked; it documents codes already produced by the manager/helpers today.
+
+Remaining work (tracked in Phase 6 tasks)
+- T148: Ensure the registry is complete and cross‑referenced from comments/docstrings (backend) and JSDoc typedefs (frontend). No new linter/tooling.
+- T144: Reference the registry from contracts and quickstart examples; align all examples to documented codes.
+- T141/T143: Add/adjust tests to assert known codes only in helper/manager paths.
+- T145–T147: Ensure the harness logs/validates codes in envelopes/results as part of the automatic and manual suites.
+
+Related references
+- [`event-schemas.md`](../specs/003-websocket-event-handlers/contracts/event-schemas.md)
+- [`websocket-handler-interface.md`](../specs/003-websocket-event-handlers/contracts/websocket-handler-interface.md)
+- [`frontend-api.md`](../specs/003-websocket-event-handlers/contracts/frontend-api.md)
diff --git a/python/api/api_log_get.py b/python/api/api_log_get.py
index 8111dbea5c..e4042c5b25 100644
--- a/python/api/api_log_get.py
+++ b/python/api/api_log_get.py
@@ -55,7 +55,7 @@ async def process(self, input: dict, request: Request) -> dict | Response:
"returned_items": len(log_items),
"start_position": start_pos,
"progress": context.log.progress,
- "progress_active": context.log.progress_active,
+ "progress_active": bool(context.log.progress_active),
"items": log_items
}
}
diff --git a/python/api/chat_create.py b/python/api/chat_create.py
index f73f3416d8..46139aeb5c 100644
--- a/python/api/chat_create.py
+++ b/python/api/chat_create.py
@@ -12,7 +12,7 @@ async def process(self, input: Input, request: Request) -> Output:
# context instance - get or create
current_context = AgentContext.get(current_ctxid)
-
+
# get/create new context
new_context = self.use_context(new_ctxid)
@@ -25,6 +25,10 @@ async def process(self, input: Input, request: Request) -> Output:
if current_data_2:
new_context.set_output_data(projects.CONTEXT_DATA_KEY_PROJECT, current_data_2)
+ # New context should appear in other tabs' chat lists via state_push.
+ from python.helpers.state_monitor_integration import mark_dirty_all
+ mark_dirty_all(reason="api.chat_create.CreateChat")
+
return {
"ok": True,
"ctxid": new_context.id,
diff --git a/python/api/chat_remove.py b/python/api/chat_remove.py
index 671e43d9ea..ee6f52aef3 100644
--- a/python/api/chat_remove.py
+++ b/python/api/chat_remove.py
@@ -25,6 +25,10 @@ async def process(self, input: Input, request: Request) -> Output:
for task in tasks:
await scheduler.remove_task_by_uuid(task.uuid)
+ # Context removal affects global chat/task lists in all tabs.
+ from python.helpers.state_monitor_integration import mark_dirty_all
+ mark_dirty_all(reason="api.chat_remove.RemoveChat")
+
return {
"message": "Context removed.",
}
diff --git a/python/api/chat_reset.py b/python/api/chat_reset.py
index 668b08e268..92e8dc723e 100644
--- a/python/api/chat_reset.py
+++ b/python/api/chat_reset.py
@@ -18,6 +18,10 @@ async def process(self, input: Input, request: Request) -> Output:
persist_chat.save_tmp_chat(context)
persist_chat.remove_msg_files(ctxid)
+ # Reset updates context metadata (log guid/version) and must refresh other tabs' lists.
+ from python.helpers.state_monitor_integration import mark_dirty_all
+ mark_dirty_all(reason="api.chat_reset.Reset")
+
return {
"message": "Agent restarted.",
}
diff --git a/python/api/csrf_token.py b/python/api/csrf_token.py
index f4d1d63c0f..fabe1e29a9 100644
--- a/python/api/csrf_token.py
+++ b/python/api/csrf_token.py
@@ -25,7 +25,6 @@ def requires_csrf(cls) -> bool:
return False
async def process(self, input: Input, request: Request) -> Output:
-
# check for allowed origin to prevent dns rebinding attacks
origin_check = await self.check_allowed_origin(request)
if not origin_check["ok"]:
@@ -38,7 +37,6 @@ async def process(self, input: Input, request: Request) -> Output:
if "csrf_token" not in session:
session["csrf_token"] = secrets.token_urlsafe(32)
- # return the csrf token and runtime id
return {
"ok": True,
"token": session["csrf_token"],
@@ -119,7 +117,7 @@ def get_default_allowed_origins(self) -> list[str]:
def initialize_allowed_origins(self, request: Request):
"""
If A0 is hosted on a server, add the first visit origin to ALLOWED_ORIGINS.
- This simplifies deployment process as users can access their new instance without
+ This simplifies deployment process as users can access their new instance without
additional setup while keeping it secure.
"""
# dotenv value is already set, do nothing
@@ -144,5 +142,3 @@ def initialize_allowed_origins(self, request: Request):
# if not, add it to the allowed origins
allowed_origins.append(req_origin)
dotenv.save_dotenv_value(ALLOWED_ORIGINS_KEY, ",".join(allowed_origins))
-
-
\ No newline at end of file
diff --git a/python/api/notifications_history.py b/python/api/notifications_history.py
index 1b9e9f3d23..83629288fc 100644
--- a/python/api/notifications_history.py
+++ b/python/api/notifications_history.py
@@ -13,8 +13,9 @@ async def process(self, input: dict, request: Request) -> dict | Response:
notification_manager = AgentContext.get_notification_manager()
# Return all notifications for history modal
+ notifications = notification_manager.output_all()
return {
- "notifications": [n.output() for n in notification_manager.notifications],
+ "notifications": notifications,
"guid": notification_manager.guid,
- "count": len(notification_manager.notifications),
+ "count": len(notifications),
}
diff --git a/python/api/notifications_mark_read.py b/python/api/notifications_mark_read.py
index 6f3d182d83..4a25f167a8 100644
--- a/python/api/notifications_mark_read.py
+++ b/python/api/notifications_mark_read.py
@@ -21,15 +21,11 @@ async def process(self, input: dict, request: Request) -> dict | Response:
if not notification_ids:
return {"success": False, "error": "No notification IDs provided"}
+ if not isinstance(notification_ids, list):
+ return {"success": False, "error": "notification_ids must be a list"}
+
# Mark specific notifications as read
- marked_count = 0
- for notification_id in notification_ids:
- # Find notification by ID and mark as read
- for notification in notification_manager.notifications:
- if notification.id == notification_id and not notification.read:
- notification.mark_read()
- marked_count += 1
- break
+ marked_count = notification_manager.mark_read_by_ids(notification_ids)
return {
"success": True,
diff --git a/python/api/poll.py b/python/api/poll.py
index dbe7105c66..b6989e55d2 100644
--- a/python/api/poll.py
+++ b/python/api/poll.py
@@ -1,125 +1,14 @@
from python.helpers.api import ApiHandler, Request, Response
-from agent import AgentContext, AgentContextType
-
-from python.helpers.task_scheduler import TaskScheduler
-from python.helpers.localization import Localization
-from python.helpers.dotenv import get_dotenv_value
+from python.helpers.state_snapshot import build_snapshot
class Poll(ApiHandler):
async def process(self, input: dict, request: Request) -> dict | Response:
- ctxid = input.get("context", "")
- from_no = input.get("log_from", 0)
- notifications_from = input.get("notifications_from", 0)
-
- # Get timezone from input (default to dotenv default or UTC if not provided)
- timezone = input.get("timezone", get_dotenv_value("DEFAULT_USER_TIMEZONE", "UTC"))
- Localization.get().set_timezone(timezone)
-
- # context instance - get or create only if ctxid is provided
- if ctxid:
- try:
- context = self.use_context(ctxid, create_if_not_exists=False)
- except Exception as e:
- context = None
- else:
- context = None
-
- # Get logs only if we have a context
- logs = context.log.output(start=from_no) if context else []
-
- # Get notifications from global notification manager
- notification_manager = AgentContext.get_notification_manager()
- notifications = notification_manager.output(start=notifications_from)
-
- # loop AgentContext._contexts
-
- # Get a task scheduler instance
- scheduler = TaskScheduler.get()
-
- # Always reload the scheduler on each poll to ensure we have the latest task state
- # await scheduler.reload() # does not seem to be needed
-
- # loop AgentContext._contexts and divide into contexts and tasks
-
- ctxs = []
- tasks = []
- processed_contexts = set() # Track processed context IDs
-
- all_ctxs = list(AgentContext._contexts.values())
- # First, identify all tasks
- for ctx in all_ctxs:
- # Skip if already processed
- if ctx.id in processed_contexts:
- continue
-
- # Skip BACKGROUND contexts as they should be invisible to users
- if ctx.type == AgentContextType.BACKGROUND:
- processed_contexts.add(ctx.id)
- continue
-
- # Create the base context data that will be returned
- context_data = ctx.output()
-
- context_task = scheduler.get_task_by_uuid(ctx.id)
- # Determine if this is a task-dedicated context by checking if a task with this UUID exists
- is_task_context = (
- context_task is not None and context_task.context_id == ctx.id
- )
-
- if not is_task_context:
- ctxs.append(context_data)
- else:
- # If this is a task, get task details from the scheduler
- task_details = scheduler.serialize_task(ctx.id)
- if task_details:
- # Add task details to context_data with the same field names
- # as used in scheduler endpoints to maintain UI compatibility
- context_data.update({
- "task_name": task_details.get("name"), # name is for context, task_name for the task name
- "uuid": task_details.get("uuid"),
- "state": task_details.get("state"),
- "type": task_details.get("type"),
- "system_prompt": task_details.get("system_prompt"),
- "prompt": task_details.get("prompt"),
- "last_run": task_details.get("last_run"),
- "last_result": task_details.get("last_result"),
- "attachments": task_details.get("attachments", []),
- "context_id": task_details.get("context_id"),
- })
-
- # Add type-specific fields
- if task_details.get("type") == "scheduled":
- context_data["schedule"] = task_details.get("schedule")
- elif task_details.get("type") == "planned":
- context_data["plan"] = task_details.get("plan")
- else:
- context_data["token"] = task_details.get("token")
-
- tasks.append(context_data)
-
- # Mark as processed
- processed_contexts.add(ctx.id)
-
- # Sort tasks and chats by their creation date, descending
- ctxs.sort(key=lambda x: x["created_at"], reverse=True)
- tasks.sort(key=lambda x: x["created_at"], reverse=True)
-
- # data from this server
- return {
- "deselect_chat": ctxid and not context,
- "context": context.id if context else "",
- "contexts": ctxs,
- "tasks": tasks,
- "logs": logs,
- "log_guid": context.log.guid if context else "",
- "log_version": len(context.log.updates) if context else 0,
- "log_progress": context.log.progress if context else 0,
- "log_progress_active": context.log.progress_active if context else False,
- "paused": context.paused if context else False,
- "notifications": notifications,
- "notifications_guid": notification_manager.guid,
- "notifications_version": len(notification_manager.updates),
- }
+ return await build_snapshot(
+ context=input.get("context"),
+ log_from=input.get("log_from", 0),
+ notifications_from=input.get("notifications_from", 0),
+ timezone=input.get("timezone"),
+ )
diff --git a/python/extensions/monologue_end/_50_memorize_fragments.py b/python/extensions/monologue_end/_50_memorize_fragments.py
index 932c7bf063..df8c25c156 100644
--- a/python/extensions/monologue_end/_50_memorize_fragments.py
+++ b/python/extensions/monologue_end/_50_memorize_fragments.py
@@ -26,8 +26,23 @@ async def execute(self, loop_data: LoopData = LoopData(), **kwargs):
# memorize in background
task = asyncio.create_task(self.memorize(loop_data, log_item))
+ # Ensure progress bar resets after background work completes when the chat is idle.
+ task.add_done_callback(lambda _task, owner_no=log_item.no: self._reset_progress_if_idle(owner_no))
return task
+ def _reset_progress_if_idle(self, owner_no: int) -> None:
+ try:
+ ctx = self.agent.context
+ if ctx and ctx.streaming_agent is None:
+ # Only reset if this background task is still the source of the current progress.
+ # This prevents clobbering progress from a newer operation that started meanwhile.
+ if getattr(ctx.log, "progress_no", None) != owner_no:
+ return
+ ctx.log.set_initial_progress()
+ except Exception:
+ # Best-effort only: do not let background completion callbacks raise.
+ return
+
async def memorize(self, loop_data: LoopData, log_item: LogItem, **kwargs):
set = settings.get_settings()
@@ -98,7 +113,7 @@ async def log_callback(content):
txt = f"{memory}"
if set["memory_memorize_consolidation"]:
-
+
try:
# Use intelligent consolidation system
from python.helpers.memory_consolidation import create_memory_consolidator
@@ -183,7 +198,7 @@ async def log_callback(content):
)
if rem:
log_item.stream(result=f"\nReplaced {len(rem)} previous memories.")
-
+
diff --git a/python/extensions/monologue_end/_51_memorize_solutions.py b/python/extensions/monologue_end/_51_memorize_solutions.py
index a66cd4e15d..ae2ad3364b 100644
--- a/python/extensions/monologue_end/_51_memorize_solutions.py
+++ b/python/extensions/monologue_end/_51_memorize_solutions.py
@@ -17,7 +17,7 @@ async def execute(self, loop_data: LoopData = LoopData(), **kwargs):
if not set["memory_memorize_enabled"]:
return
-
+
# show full util message
log_item = self.agent.context.log.log(
type="util",
@@ -26,8 +26,23 @@ async def execute(self, loop_data: LoopData = LoopData(), **kwargs):
# memorize in background
task = asyncio.create_task(self.memorize(loop_data, log_item))
+ # Ensure progress bar resets after background work completes when the chat is idle.
+ task.add_done_callback(lambda _task, owner_no=log_item.no: self._reset_progress_if_idle(owner_no))
return task
+ def _reset_progress_if_idle(self, owner_no: int) -> None:
+ try:
+ ctx = self.agent.context
+ if ctx and ctx.streaming_agent is None:
+ # Only reset if this background task is still the source of the current progress.
+ # This prevents clobbering progress from a newer operation that started meanwhile.
+ if getattr(ctx.log, "progress_no", None) != owner_no:
+ return
+ ctx.log.set_initial_progress()
+ except Exception:
+ # Best-effort only: do not let background completion callbacks raise.
+ return
+
async def memorize(self, loop_data: LoopData, log_item: LogItem, **kwargs):
set = settings.get_settings()
diff --git a/python/extensions/response_stream/_20_live_response.py b/python/extensions/response_stream/_20_live_response.py
index f9455ed82e..34ef4ff2a2 100644
--- a/python/extensions/response_stream/_20_live_response.py
+++ b/python/extensions/response_stream/_20_live_response.py
@@ -1,9 +1,5 @@
-from python.helpers import persist_chat, tokens
from python.helpers.extension import Extension
from agent import LoopData
-import asyncio
-from python.helpers.log import LogItem
-from python.helpers import log
class LiveResponse(Extension):
@@ -11,18 +7,16 @@ class LiveResponse(Extension):
async def execute(
self,
loop_data: LoopData = LoopData(),
- text: str = "",
- parsed: dict = {},
+ _text: str = "",
+ parsed: dict | None = None,
**kwargs,
):
try:
- if (
- not "tool_name" in parsed
- or parsed["tool_name"] != "response"
- or "tool_args" not in parsed
- or "text" not in parsed["tool_args"]
- or not parsed["tool_args"]["text"]
- ):
+ parsed = parsed if isinstance(parsed, dict) else {}
+ tool_args = parsed.get("tool_args")
+ is_response = parsed.get("tool_name") == "response"
+ has_text = isinstance(tool_args, dict) and bool(tool_args.get("text"))
+ if not (is_response and has_text):
return # not a response
# create log message and store it in loop data temporary params
@@ -37,5 +31,5 @@ async def execute(
# update log message
log_item = loop_data.params_temporary["log_item_response"]
log_item.update(content=parsed["tool_args"]["text"])
- except Exception as e:
+ except Exception:
pass
diff --git a/python/helpers/api.py b/python/helpers/api.py
index 6c90c6e566..9593d98c98 100644
--- a/python/helpers/api.py
+++ b/python/helpers/api.py
@@ -10,12 +10,14 @@
from python.helpers.errors import format_error
from werkzeug.serving import make_server
+ThreadLockType = Union[threading.Lock, threading.RLock]
+
Input = dict
Output = Union[Dict[str, Any], Response, TypedDict] # type: ignore
class ApiHandler:
- def __init__(self, app: Flask, thread_lock: threading.Lock):
+ def __init__(self, app: Flask, thread_lock: ThreadLockType):
self.app = app
self.thread_lock = thread_lock
diff --git a/python/helpers/log.py b/python/helpers/log.py
index a799666588..4116b83419 100644
--- a/python/helpers/log.py
+++ b/python/helpers/log.py
@@ -1,19 +1,43 @@
-from dataclasses import dataclass, field
+import copy
import json
-from typing import Any, Literal, Optional, Dict, TypeVar, TYPE_CHECKING
-
-T = TypeVar("T")
+import threading
import uuid
-from collections import OrderedDict # Import OrderedDict
-from python.helpers.strings import truncate_text_by_ratio
-import copy
-from typing import TypeVar
+from collections import OrderedDict
+from dataclasses import dataclass
+from typing import Any, Literal, Optional, TYPE_CHECKING, TypeVar, cast
+
from python.helpers.secrets import get_secrets_manager
+from python.helpers.strings import truncate_text_by_ratio
if TYPE_CHECKING:
from agent import AgentContext
+
+_MARK_DIRTY_ALL = None
+_MARK_DIRTY_FOR_CONTEXT = None
+
+
+def _lazy_mark_dirty_all(*, reason: str | None = None) -> None:
+ # Lazy import to avoid circular import at module load time (AgentContext -> Log).
+ global _MARK_DIRTY_ALL
+ if _MARK_DIRTY_ALL is None:
+ from python.helpers.state_monitor_integration import mark_dirty_all
+
+ _MARK_DIRTY_ALL = mark_dirty_all
+ _MARK_DIRTY_ALL(reason=reason)
+
+
+def _lazy_mark_dirty_for_context(context_id: str, *, reason: str | None = None) -> None:
+ # Lazy import to avoid circular import at module load time (AgentContext -> Log).
+ global _MARK_DIRTY_FOR_CONTEXT
+ if _MARK_DIRTY_FOR_CONTEXT is None:
+ from python.helpers.state_monitor_integration import mark_dirty_for_context
+
+ _MARK_DIRTY_FOR_CONTEXT = mark_dirty_for_context
+ _MARK_DIRTY_FOR_CONTEXT(context_id, reason=reason)
+
+
T = TypeVar("T")
Type = Literal[
@@ -66,14 +90,14 @@ def _truncate_value(val: T) -> T:
v = val[k]
del val[k]
val[_truncate_key(k)] = _truncate_value(v)
- return val
+ return cast(T, val)
# If list or tuple, recursively truncate each item
if isinstance(val, list):
for i in range(len(val)):
val[i] = _truncate_value(val[i])
- return val
+ return cast(T, val)
if isinstance(val, tuple):
- return tuple(_truncate_value(x) for x in val) # type: ignore
+ return cast(T, tuple(_truncate_value(x) for x in val))
# Convert non-str values to json for consistent length measurement
if isinstance(val, str):
@@ -91,7 +115,7 @@ def _truncate_value(val: T) -> T:
removed = len(raw) - VALUE_MAX_LEN
replacement = f"\n\n<< {removed} Characters hidden >>\n\n"
truncated = truncate_text_by_ratio(raw, VALUE_MAX_LEN, replacement, ratio=0.3)
- return truncated
+ return cast(T, truncated)
def _truncate_content(text: str | None, type: Type) -> str:
@@ -116,9 +140,6 @@ def _truncate_content(text: str | None, type: Type) -> str:
return truncated
-
-
-
@dataclass
class LogItem:
log: "Log"
@@ -187,10 +208,14 @@ def output(self):
class Log:
def __init__(self):
- self.context: "AgentContext|None" = None # set from outside
+ self._lock = threading.RLock()
+ self.context: "AgentContext|None" = None # set from outside
self.guid: str = str(uuid.uuid4())
self.updates: list[int] = []
self.logs: list[LogItem] = []
+ self.progress: str = ""
+ self.progress_no: int = 0
+ self.progress_active: bool = False
self.set_initial_progress()
def log(
@@ -204,16 +229,17 @@ def log(
id: Optional[str] = None,
**kwargs,
) -> LogItem:
+ with self._lock:
+ # add a minimal item to the log
+ item = LogItem(
+ log=self,
+ no=len(self.logs),
+ type=type,
+ )
+ self.logs.append(item)
- # add a minimal item to the log
- item = LogItem(
- log=self,
- no=len(self.logs),
- type=type,
- )
- self.logs.append(item)
-
- # and update it (to have just one implementation)
+ # Update outside the lock - the heavy masking/truncation work should not hold
+ # the lock; we only need locking while mutating shared arrays/fields.
self._update_item(
no=item.no,
type=type,
@@ -223,8 +249,11 @@ def log(
temp=temp,
update_progress=update_progress,
id=id,
+ notify_state_monitor=False,
**kwargs,
)
+
+ self._notify_state_monitor()
return item
def _update_item(
@@ -237,88 +266,144 @@ def _update_item(
temp: bool | None = None,
update_progress: ProgressUpdate | None = None,
id: Optional[str] = None,
+ notify_state_monitor: bool = True,
**kwargs,
):
- item = self.logs[no]
-
- if id is not None:
- item.id = id
-
- if type is not None:
- item.type = type
-
- if temp is not None:
- item.temp = temp
-
- if update_progress is not None:
- item.update_progress = update_progress
-
+ # Capture the effective type for truncation without holding the lock during
+ # masking/truncation work.
+ with self._lock:
+ current_type = self.logs[no].type
+ type_for_truncation = type if type is not None else current_type
- # adjust all content before processing
+ heading_out: str | None = None
if heading is not None:
- heading = self._mask_recursive(heading)
- heading = _truncate_heading(heading)
- item.heading = heading
+ heading_out = _truncate_heading(self._mask_recursive(heading))
+
+ content_out: str | None = None
if content is not None:
- content = self._mask_recursive(content)
- content = _truncate_content(content, item.type)
- item.content = content
+ content_out = _truncate_content(self._mask_recursive(content), type_for_truncation)
+
+ kvps_out: OrderedDict | None = None
if kvps is not None:
- kvps = OrderedDict(copy.deepcopy(kvps))
- kvps = self._mask_recursive(kvps)
- kvps = _truncate_value(kvps)
- item.kvps = kvps
- elif item.kvps is None:
- item.kvps = OrderedDict()
- if kwargs:
- kwargs = copy.deepcopy(kwargs)
- kwargs = self._mask_recursive(kwargs)
- item.kvps.update(kwargs)
+ kvps_out_tmp = OrderedDict(copy.deepcopy(kvps))
+ kvps_out_tmp = self._mask_recursive(kvps_out_tmp)
+ kvps_out_tmp = _truncate_value(kvps_out_tmp)
+ kvps_out = OrderedDict(kvps_out_tmp)
- self.updates += [item.no]
- self._update_progress_from_item(item)
+ kwargs_out: dict | None = None
+ if kwargs:
+ kwargs_out = copy.deepcopy(kwargs)
+ kwargs_out = self._mask_recursive(kwargs_out)
+
+ with self._lock:
+ item = self.logs[no]
+
+ if id is not None:
+ item.id = id
+
+ if type is not None:
+ item.type = type
+
+ if temp is not None:
+ item.temp = temp
+
+ if update_progress is not None:
+ item.update_progress = update_progress
+
+ if heading_out is not None:
+ item.heading = heading_out
+
+ if content_out is not None:
+ item.content = content_out
+
+ if kvps_out is not None:
+ item.kvps = kvps_out
+ elif item.kvps is None:
+ item.kvps = OrderedDict()
+
+ if kwargs_out:
+ if item.kvps is None:
+ item.kvps = OrderedDict()
+ item.kvps.update(kwargs_out)
+
+ self.updates.append(item.no)
+
+ if item.heading and item.update_progress != "none":
+ if item.no >= self.progress_no:
+ self.progress = item.heading
+ self.progress_no = (
+ item.no if item.update_progress == "persistent" else -1
+ )
+ self.progress_active = True
+ if notify_state_monitor:
+ self._notify_state_monitor_for_context_update()
+
+ def _notify_state_monitor(self) -> None:
+ ctx = self.context
+ if not ctx:
+ return
+ # Logs update both the active chat stream (sid-bound) and the global chats list
+ # (context metadata like last_message/log_version). Broadcast so all tabs refresh
+ # their chat/task lists without leaking logs (logs are still scoped per-sid).
+ _lazy_mark_dirty_all(reason="log.Log._notify_state_monitor")
+
+ def _notify_state_monitor_for_context_update(self) -> None:
+ ctx = self.context
+ if not ctx:
+ return
+ # Log item updates only need to refresh the active chat stream for any sid
+ # currently projecting this context. Avoid global fanout at high frequency.
+ _lazy_mark_dirty_for_context(ctx.id, reason="log.Log._update_item")
def set_progress(self, progress: str, no: int = 0, active: bool = True):
progress = self._mask_recursive(progress)
progress = _truncate_progress(progress)
- self.progress = progress
- if not no:
- no = len(self.logs)
- self.progress_no = no
- self.progress_active = active
+ changed = False
+ ctx = self.context
+ with self._lock:
+ prev_progress = self.progress
+ prev_active = self.progress_active
+
+ self.progress = progress
+ if not no:
+ no = len(self.logs)
+ self.progress_no = no
+ self.progress_active = active
+
+ changed = self.progress != prev_progress or self.progress_active != prev_active
+
+ if changed and ctx:
+ # Progress changes are included in every snapshot, but push sync requires a
+ # dirty mark even when no log items changed.
+ _lazy_mark_dirty_for_context(ctx.id, reason="log.Log.set_progress")
def set_initial_progress(self):
self.set_progress("Waiting for input", 0, False)
def output(self, start=None, end=None):
- if start is None:
- start = 0
- if end is None:
- end = len(self.updates)
+ with self._lock:
+ if start is None:
+ start = 0
+ if end is None:
+ end = len(self.updates)
+ updates = self.updates[start:end]
+ logs = list(self.logs)
out = []
seen = set()
- for update in self.updates[start:end]:
- if update not in seen:
- out.append(self.logs[update].output())
+ for update in updates:
+ if update not in seen and update < len(logs):
+ out.append(logs[update].output())
seen.add(update)
-
return out
def reset(self):
- self.guid = str(uuid.uuid4())
- self.updates = []
- self.logs = []
+ with self._lock:
+ self.guid = str(uuid.uuid4())
+ self.updates = []
+ self.logs = []
self.set_initial_progress()
- def _update_progress_from_item(self, item: LogItem):
- if item.heading and item.update_progress != "none":
- if item.no >= self.progress_no:
- self.set_progress(
- item.heading,
- (item.no if item.update_progress == "persistent" else -1),
- )
-
def _mask_recursive(self, obj: T) -> T:
"""Recursively mask secrets in nested objects."""
try:
@@ -333,13 +418,13 @@ def _mask_recursive(self, obj: T) -> T:
# print(f"Context ID mismatch: {self_id} != {current_id}")
if isinstance(obj, str):
- return secrets_mgr.mask_values(obj)
+ return cast(Any, secrets_mgr.mask_values(obj))
elif isinstance(obj, dict):
return {k: self._mask_recursive(v) for k, v in obj.items()} # type: ignore
elif isinstance(obj, list):
return [self._mask_recursive(item) for item in obj] # type: ignore
else:
return obj
- except Exception as _e:
+ except Exception:
# If masking fails, return original object
- return obj
\ No newline at end of file
+ return obj
diff --git a/python/helpers/mcp_server.py b/python/helpers/mcp_server.py
index 3c0308ed9c..d41fdeb725 100644
--- a/python/helpers/mcp_server.py
+++ b/python/helpers/mcp_server.py
@@ -1,9 +1,11 @@
import os
+import asyncio
from typing import Annotated, Literal, Union
from urllib.parse import urlparse
from openai import BaseModel
from pydantic import Field
-from fastmcp import FastMCP # type: ignore
+import fastmcp
+from fastmcp import FastMCP
import contextvars
from agent import AgentContext, AgentContextType, UserMessage
@@ -15,7 +17,8 @@
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.exceptions import HTTPException as StarletteHTTPException
from starlette.types import ASGIApp, Receive, Scope, Send
-from fastmcp.server.http import create_sse_app # type: ignore
+from fastmcp.server.http import create_sse_app, create_base_app, build_resource_metadata_url # type: ignore
+from starlette.routing import Mount # type: ignore
from starlette.requests import Request
import threading
@@ -319,37 +322,39 @@ def reconfigure(self, token: str):
message_path = f"/t-{self.token}/messages/"
# Update settings in the MCP server instance if provided
- mcp_server.settings.message_path = message_path
- mcp_server.settings.sse_path = sse_path
+ # Keep FastMCP settings synchronized so downstream helpers that read these
+ # values (including deprecated accessors) resolve the runtime paths.
+ fastmcp.settings.message_path = message_path
+ fastmcp.settings.sse_path = sse_path
+ fastmcp.settings.streamable_http_path = http_path
# Create new MCP apps with updated settings
with self._lock:
+ middleware = [Middleware(BaseHTTPMiddleware, dispatch=mcp_middleware)]
+
self.sse_app = create_sse_app(
server=mcp_server,
- message_path=mcp_server.settings.message_path,
- sse_path=mcp_server.settings.sse_path,
- auth_server_provider=mcp_server._auth_server_provider,
- auth_settings=mcp_server.settings.auth,
- debug=mcp_server.settings.debug,
- routes=mcp_server._additional_http_routes,
- middleware=[Middleware(BaseHTTPMiddleware, dispatch=mcp_middleware)],
+ message_path=message_path,
+ sse_path=sse_path,
+ auth=mcp_server.auth,
+ debug=fastmcp.settings.debug,
+ middleware=list(middleware),
)
- # For HTTP, we need to create a custom app since the lifespan manager
- # doesn't work properly in our Flask/Werkzeug environment
self.http_app = self._create_custom_http_app(
http_path,
- mcp_server._auth_server_provider,
- mcp_server.settings.auth,
- mcp_server.settings.debug,
- mcp_server._additional_http_routes,
+ middleware=list(middleware),
)
- def _create_custom_http_app(self, streamable_http_path, auth_server_provider, auth_settings, debug, routes):
- """Create a custom HTTP app that manages the session manager manually."""
- from fastmcp.server.http import setup_auth_middleware_and_routes, create_base_app # type: ignore
+ def _create_custom_http_app(
+ self,
+ streamable_http_path: str,
+ *,
+ middleware: list[Middleware],
+ ) -> ASGIApp:
+ """Create a Streamable HTTP app with manual session manager lifecycle."""
+
from mcp.server.streamable_http_manager import StreamableHTTPSessionManager # type: ignore
- from starlette.routing import Mount
from mcp.server.auth.middleware.bearer_auth import RequireAuthMiddleware # type: ignore
import anyio
@@ -357,9 +362,6 @@ def _create_custom_http_app(self, streamable_http_path, auth_server_provider, au
server_middleware = []
self.http_session_task_group = None
-
-
- # Create session manager
self.http_session_manager = StreamableHTTPSessionManager(
app=mcp_server._mcp_server,
event_store=None,
@@ -367,10 +369,7 @@ def _create_custom_http_app(self, streamable_http_path, auth_server_provider, au
stateless=False,
)
-
- # Custom ASGI handler that ensures task group is initialized
async def handle_streamable_http(scope, receive, send):
- # Lazy initialization of task group
if self.http_session_task_group is None:
self.http_session_task_group = anyio.create_task_group()
await self.http_session_task_group.__aenter__()
@@ -380,20 +379,25 @@ async def handle_streamable_http(scope, receive, send):
if self.http_session_manager:
await self.http_session_manager.handle_request(scope, receive, send)
- # Get auth middleware and routes
- auth_middleware, auth_routes, required_scopes = setup_auth_middleware_and_routes(
- auth_server_provider, auth_settings
- )
+ auth_provider = mcp_server.auth
- server_routes.extend(auth_routes)
- server_middleware.extend(auth_middleware)
+ if auth_provider:
+ server_routes.extend(auth_provider.get_routes(mcp_path=streamable_http_path))
+ server_middleware.extend(auth_provider.get_middleware())
+
+ resource_url = auth_provider._get_resource_url(streamable_http_path)
+ resource_metadata_url = (
+ build_resource_metadata_url(resource_url) if resource_url else None
+ )
- # Add StreamableHTTP routes with or without auth
- if auth_server_provider:
server_routes.append(
Mount(
streamable_http_path,
- app=RequireAuthMiddleware(handle_streamable_http, required_scopes),
+ app=RequireAuthMiddleware(
+ handle_streamable_http,
+ auth_provider.required_scopes,
+ resource_metadata_url,
+ ),
)
)
else:
@@ -404,18 +408,16 @@ async def handle_streamable_http(scope, receive, send):
)
)
- # Add custom routes with lowest precedence
- if routes:
- server_routes.extend(routes)
+ additional_routes = mcp_server._get_additional_http_routes()
+ if additional_routes:
+ server_routes.extend(additional_routes)
- # Add middleware
- server_middleware.append(Middleware(BaseHTTPMiddleware, dispatch=mcp_middleware))
+ server_middleware.extend(middleware)
- # Create and return the app
return create_base_app(
routes=server_routes,
middleware=server_middleware,
- debug=debug,
+ debug=fastmcp.settings.debug,
)
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
diff --git a/python/helpers/notification.py b/python/helpers/notification.py
index 1aefff7b4d..a15cb1b385 100644
--- a/python/helpers/notification.py
+++ b/python/helpers/notification.py
@@ -1,5 +1,6 @@
from dataclasses import dataclass
import uuid
+import threading
from datetime import datetime, timezone, timedelta
from enum import Enum
@@ -11,6 +12,7 @@ class NotificationType(Enum):
ERROR = "error"
PROGRESS = "progress"
+
class NotificationPriority(Enum):
NORMAL = 10
HIGH = 20
@@ -40,7 +42,7 @@ def __post_init__(self):
def mark_read(self):
self.read = True
- self.manager._update_item(self.no, read=True)
+ self.manager.update_item(self.no, read=True)
def output(self):
return {
@@ -60,6 +62,7 @@ def output(self):
class NotificationManager:
def __init__(self, max_notifications: int = 100):
+ self._lock = threading.RLock()
self.guid: str = str(uuid.uuid4())
self.updates: list[int] = []
self.notifications: list[NotificationItem] = []
@@ -90,75 +93,136 @@ def add_notification(
display_time: int = 3,
group: str = "",
) -> NotificationItem:
- # Create notification item
- item = NotificationItem(
- manager=self,
- no=len(self.notifications),
- type=NotificationType(type),
- priority=NotificationPriority(priority),
- title=title,
- message=message,
- detail=detail,
- timestamp=datetime.now(timezone.utc),
- display_time=display_time,
- group=group,
- )
-
- # Add to notifications
- self.notifications.append(item)
- self.updates.append(item.no)
-
- # Enforce limit
- self._enforce_limit()
-
+ with self._lock:
+ # Create notification item
+ item = NotificationItem(
+ manager=self,
+ no=len(self.notifications),
+ type=NotificationType(type),
+ priority=NotificationPriority(priority),
+ title=title,
+ message=message,
+ detail=detail,
+ timestamp=datetime.now(timezone.utc),
+ display_time=display_time,
+ group=group,
+ )
+
+ # Add to notifications
+ self.notifications.append(item)
+ self.updates.append(item.no)
+
+ # Enforce limit
+ self._enforce_limit()
+
+ from python.helpers.state_monitor_integration import mark_dirty_all
+ mark_dirty_all(reason="notification.NotificationManager.add_notification")
return item
def _enforce_limit(self):
- if len(self.notifications) > self.max_notifications:
- # Remove oldest notifications
- to_remove = len(self.notifications) - self.max_notifications
- self.notifications = self.notifications[to_remove:]
- # Adjust notification numbers
- for i, notification in enumerate(self.notifications):
- notification.no = i
- # Adjust updates list
- self.updates = [no - to_remove for no in self.updates if no >= to_remove]
+ with self._lock:
+ if len(self.notifications) > self.max_notifications:
+ # Remove oldest notifications
+ to_remove = len(self.notifications) - self.max_notifications
+ self.notifications = self.notifications[to_remove:]
+ # Adjust notification numbers
+ for i, notification in enumerate(self.notifications):
+ notification.no = i
+ # Adjust updates list
+ self.updates = [no - to_remove for no in self.updates if no >= to_remove]
def get_recent_notifications(self, seconds: int = 30) -> list[NotificationItem]:
cutoff = datetime.now(timezone.utc) - timedelta(seconds=seconds)
- return [n for n in self.notifications if n.timestamp >= cutoff]
+ with self._lock:
+ return [n for n in self.notifications if n.timestamp >= cutoff]
def output(self, start: int | None = None, end: int | None = None) -> list[dict]:
- if start is None:
- start = 0
- if end is None:
- end = len(self.updates)
+ with self._lock:
+ if start is None:
+ start = 0
+ if end is None:
+ end = len(self.updates)
+ updates = self.updates[start:end]
+ notifications = list(self.notifications)
out = []
seen = set()
- for update in self.updates[start:end]:
- if update not in seen and update < len(self.notifications):
- out.append(self.notifications[update].output())
+ for update in updates:
+ if update not in seen and update < len(notifications):
+ out.append(notifications[update].output())
seen.add(update)
-
return out
+ def output_all(self) -> list[dict]:
+ with self._lock:
+ notifications = list(self.notifications)
+ return [n.output() for n in notifications]
+
+ def mark_read_by_ids(self, notification_ids: list[str]) -> int:
+ ids = {nid for nid in notification_ids if isinstance(nid, str) and nid.strip()}
+ if not ids:
+ return 0
+
+ changed_nos: list[int] = []
+ with self._lock:
+ for notification in self.notifications:
+ if notification.id in ids and not notification.read:
+ notification.read = True
+ changed_nos.append(notification.no)
+ if changed_nos:
+ self.updates.extend(changed_nos)
+
+ if not changed_nos:
+ return 0
+
+ from python.helpers.state_monitor_integration import mark_dirty_all
+ mark_dirty_all(reason="notification.NotificationManager.mark_read_by_ids")
+ return len(changed_nos)
+
+ def update_item(self, no: int, **kwargs) -> None:
+ self._update_item(no, **kwargs)
+
def _update_item(self, no: int, **kwargs):
- if no < len(self.notifications):
- item = self.notifications[no]
- for key, value in kwargs.items():
- if hasattr(item, key):
- setattr(item, key, value)
- self.updates.append(no)
+ changed = False
+ with self._lock:
+ if no < len(self.notifications):
+ item = self.notifications[no]
+ for key, value in kwargs.items():
+ if hasattr(item, key):
+ setattr(item, key, value)
+ self.updates.append(no)
+ changed = True
+
+ if not changed:
+ return
+
+ from python.helpers.state_monitor_integration import mark_dirty_all
+ mark_dirty_all(reason="notification.NotificationManager._update_item")
def mark_all_read(self):
- for notification in self.notifications:
- notification.read = True
+ changed_nos: list[int] = []
+ with self._lock:
+ for notification in self.notifications:
+ if not notification.read:
+ notification.read = True
+ changed_nos.append(notification.no)
+ if changed_nos:
+ self.updates.extend(changed_nos)
+
+ if not changed_nos:
+ return
+
+ from python.helpers.state_monitor_integration import mark_dirty_all
+ mark_dirty_all(reason="notification.NotificationManager.mark_all_read")
def clear_all(self):
- self.notifications = []
- self.updates = []
- self.guid = str(uuid.uuid4())
+ with self._lock:
+ self.notifications = []
+ self.updates = []
+ self.guid = str(uuid.uuid4())
+ from python.helpers.state_monitor_integration import mark_dirty_all
+ mark_dirty_all(reason="notification.NotificationManager.clear_all")
def get_notifications_by_type(self, type: NotificationType) -> list[NotificationItem]:
- return [n for n in self.notifications if n.type == type]
\ No newline at end of file
+ with self._lock:
+ return [n for n in self.notifications if n.type == type]
diff --git a/python/helpers/persist_chat.py b/python/helpers/persist_chat.py
index 55867e6fe5..3e79030b95 100644
--- a/python/helpers/persist_chat.py
+++ b/python/helpers/persist_chat.py
@@ -44,7 +44,7 @@ def save_tmp_chat(context: AgentContext):
def save_tmp_chats():
"""Save all contexts to the chats folder"""
- for _, context in AgentContext._contexts.items():
+ for context in AgentContext.all():
# Skip BACKGROUND contexts as they should be ephemeral
if context.type == AgentContextType.BACKGROUND:
continue
@@ -164,13 +164,17 @@ def _serialize_agent(agent: Agent):
def _serialize_log(log: Log):
+ # Guard against concurrent log mutations while serializing.
+ with log._lock:
+ logs = [item.output() for item in log.logs[-LOG_SIZE:]] # serialize LogItem objects
+ guid = log.guid
+ progress = log.progress
+ progress_no = log.progress_no
return {
- "guid": log.guid,
- "logs": [
- item.output() for item in log.logs[-LOG_SIZE:]
- ], # serialize LogItem objects
- "progress": log.progress,
- "progress_no": log.progress_no,
+ "guid": guid,
+ "logs": logs,
+ "progress": progress,
+ "progress_no": progress_no,
}
@@ -271,6 +275,7 @@ def _deserialize_log(data: dict[str, Any]) -> "Log":
content=item_data.get("content", ""),
kvps=OrderedDict(item_data["kvps"]) if item_data["kvps"] else None,
temp=item_data.get("temp", False),
+ id=item_data.get("id", None),
)
)
log.updates.append(i)
diff --git a/python/helpers/print_style.py b/python/helpers/print_style.py
index 188697c866..25b7a408eb 100644
--- a/python/helpers/print_style.py
+++ b/python/helpers/print_style.py
@@ -1,8 +1,20 @@
import os, webcolors, html
import sys
from datetime import datetime
+from collections.abc import Mapping
from . import files
+_runtime_module = None
+
+
+def _get_runtime():
+ global _runtime_module
+ if _runtime_module is None:
+ from . import runtime as runtime_module # Local import to avoid circular dependency
+
+ _runtime_module = runtime_module
+ return _runtime_module
+
class PrintStyle:
last_endline = True
log_file_path = None
@@ -90,9 +102,39 @@ def _close_html_log():
with open(PrintStyle.log_file_path, "a") as f:
f.write("