-
Notifications
You must be signed in to change notification settings - Fork 2.3k
fix(chat): handle message creation in own thread #7303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 issue found across 1 file
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="backend/onyx/server/query_and_chat/chat_backend.py">
<violation number="1" location="backend/onyx/server/query_and_chat/chat_backend.py:580">
P2: Use `asyncio.get_running_loop()` instead of `asyncio.get_event_loop()`. In Python 3.10+, `get_event_loop()` is deprecated inside async functions. Since this is an async endpoint with a running event loop, `get_running_loop()` is the recommended and explicit approach.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
Fixes a bug where refreshing during streaming causes message processing to halt by implementing a producer-consumer pattern using asyncio.Queue and a background thread. The producer thread continues processing messages even if the client connection drops, ensuring chat messages are saved to the database. Headers are captured before thread creation to preserve request context across the thread boundary.
Confidence Score: 2/5
- Moderate risk - fixes the refresh bug but introduces resource management issues with orphaned background threads
- The PR successfully addresses the refresh bug by decoupling message processing from client streaming using a producer-consumer pattern. However, there's a critical issue: if the consumer fails or terminates early (e.g., client disconnect), the producer thread continues running indefinitely with no cleanup mechanism. The background thread reference isn't stored, making it impossible to clean up. Additionally, the unbounded queue could lead to memory issues, and there's temporary debug logging that should be removed. These resource management concerns warrant careful testing before merge.
- backend/onyx/server/query_and_chat/chat_backend.py requires attention for thread lifecycle management and error handling between producer and consumer
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| backend/onyx/server/query_and_chat/chat_backend.py | 2/5 | Implements producer-consumer pattern for streaming chat messages using asyncio.Queue and background thread; missing error propagation and consumer cancellation handling |
Sequence Diagram
sequenceDiagram
participant Client
participant FastAPI as FastAPI Handler<br/>(async)
participant Consumer as stream_from_buffer()<br/>(async generator)
participant Queue as asyncio.Queue
participant Producer as producer()<br/>(background thread)
participant DB as handle_stream_message_objects
Client->>FastAPI: POST /send-chat-message
FastAPI->>FastAPI: Create asyncio.Queue
FastAPI->>FastAPI: Capture request headers
FastAPI->>Producer: run_in_background(producer)
activate Producer
FastAPI->>Consumer: StreamingResponse(stream_from_buffer())
activate Consumer
FastAPI-->>Client: Start SSE stream
Producer->>DB: Iterate handle_stream_message_objects()
activate DB
loop For each message object
DB-->>Producer: yield message object
Producer->>Queue: loop.call_soon_threadsafe(put_nowait)
Queue-->>Consumer: await buffer.get()
Consumer-->>Client: yield JSON line
end
DB-->>Producer: Iteration complete
deactivate DB
Producer->>Queue: put_nowait(None) - end signal
deactivate Producer
Queue-->>Consumer: await buffer.get() returns None
Consumer-->>Client: Close stream
deactivate Consumer
Note over Producer,Consumer: If client disconnects early,<br/>consumer stops but producer<br/>continues processing
|
|
||
| return StreamingResponse(stream_generator(), media_type="text/event-stream") | ||
| # Start the producer in a background thread | ||
| run_in_background(producer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The producer thread continues running even if the client disconnects, which is the intended fix for the refresh bug. However, if the consumer (stream_from_buffer) terminates early (e.g., client disconnect, error), the producer thread has no way to know and will keep processing. This could lead to wasted resources processing messages nobody will receive.
Consider tracking the consumer's state and providing a way for the producer to check if it should stop early, or use a cancellation token pattern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should only stop if the user presses the stop button. There is another mechanism that is applied that takes care of this.
| async def stream_from_buffer() -> AsyncGenerator[str, None]: | ||
| """ | ||
| Async generator that reads from the buffer and yields to the client. | ||
| """ | ||
| try: | ||
| while True: | ||
| # Await the next item from the buffer | ||
| item = await buffer.get() | ||
| if item is None: | ||
| # End of stream signal | ||
| break | ||
| yield item | ||
| finally: | ||
| logger.debug("Stream consumer finished") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If stream_from_buffer() raises an exception (e.g., during buffer.get()), the consumer terminates but the producer thread continues running in the background. The producer will keep writing to the buffer indefinitely, wasting resources. The background thread started by run_in_background has no reference stored and cannot be cleaned up.
Consider storing the thread reference and implementing cleanup logic to handle consumer failures, or add try-except around the consumer to signal the producer to stop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to stop the producer. The producer should complete it's job.
| buffer: asyncio.Queue[str | None] = asyncio.Queue() | ||
| loop = asyncio.get_event_loop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The asyncio.Queue is created without a maxsize limit, which means the producer can fill the queue indefinitely if it produces faster than the consumer can consume. If the consumer is blocked or slow, this could lead to unbounded memory growth.
Consider adding a reasonable maxsize parameter (e.g., asyncio.Queue(maxsize=100)) to apply backpressure and prevent memory issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not expecting this to be a limitation
| loop.call_soon_threadsafe( | ||
| buffer.put_nowait, get_json_line(obj.model_dump()) | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the asyncio queue is full (when maxsize is set), put_nowait will raise asyncio.QueueFull exception, which would crash the producer thread. The current code doesn't handle this case.
While the queue is currently unbounded, if backpressure is added in the future, this needs proper error handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is currently no maxsize set, and for now we don't expect to put one
Description
There is a bug where if we refresh while streaming, the connection becomes lost resulting in fastapi no longer yielding results from the streaming object. This leads to message processing in the 'handle_stream_message_objects' function get blocked (or halting) on the yield from. As such, we don't continue don't the control flow, and don't end up saving the finished chat in the database.
This thread spins the message creation/processing logic in it's own producer thread and puts the result in a buffer that the consumer can stream to the client.
How Has This Been Tested?
Manually tested that chats still stream
Additional Options
Summary by cubic
Fixes chat streaming hangs on refresh by moving message creation to a background producer thread and streaming from an async buffer. Ensures chats finish processing and are saved even if the SSE connection drops.
Written for commit 5848975. Summary will update on new commits.