Feat : Add SSE streaming endpoint#6
Conversation
- Reduce perceived latency by increasing state transparency for long-running responses
There was a problem hiding this comment.
Pull request overview
This pull request adds Server-Sent Events (SSE) streaming functionality to the RAG pipeline to improve transparency and reduce perceived latency during long-running vector database queries. It introduces a new streaming endpoint that sends real-time progress updates as queries are processed.
Changes:
- Added new SSE streaming endpoint
/answerQuestion/streamusing GET method - Created async generator
async_gen_process_sql_categoryto support streaming responses with intermediate progress updates - Implemented structured SSE message format with status indicators for vector search completion, VQL generation, and final results
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 13 comments.
| File | Description |
|---|---|
| api/utils/sdk_answer_question.py | Added async generator function to process SQL category questions with streaming support, yielding intermediate results during VQL generation and execution |
| api/endpoints/answerQuestion.py | Added new GET endpoint for SSE streaming and generator function to orchestrate the streaming response with progress updates |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
|
|
||
| # async generator | ||
| async def async_gen_process_sql_category(request, vector_search_tables, sql_gen_llm, chat_llm, category_response, auth, timings, session_id = None, sample_data = None): |
There was a problem hiding this comment.
The async generator function lacks a docstring explaining its purpose, parameters, yields, and behavior. This is especially important for generator functions as they have different execution patterns. Add a comprehensive docstring documenting the function's purpose (streaming SQL category processing), its parameters, what it yields (dict with 'type' and 'data' keys), and any important behavioral notes about the streaming protocol.
| async def async_gen_process_sql_category(request, vector_search_tables, sql_gen_llm, chat_llm, category_response, auth, timings, session_id = None, sample_data = None): | |
| async def async_gen_process_sql_category(request, vector_search_tables, sql_gen_llm, chat_llm, category_response, auth, timings, session_id = None, sample_data = None): | |
| """ | |
| Stream the processing of a SQL (VQL) category question and its LLM-based enrichment. | |
| This async generator mirrors the behavior of ``process_sql_category`` but exposes | |
| its work as a sequence of streaming events. It incrementally performs VQL query | |
| generation, optional query fixing, execution, and response enrichment, yielding | |
| structured messages that callers can consume as they arrive instead of waiting | |
| for the full result. | |
| Parameters | |
| ---------- | |
| request : | |
| An object representing the incoming question and options. It is expected to | |
| provide fields such as ``question``, ``custom_instructions``, | |
| ``vector_search_sample_data_k``, and flags that influence response shape | |
| (for example, verbosity or plotting options). | |
| vector_search_tables : | |
| Metadata or configuration describing the tables, views, or indices that can | |
| be used for vector / semantic search when generating and executing VQL. | |
| sql_gen_llm : | |
| LLM client used for translating the natural language question into a VQL | |
| query and for any follow‑up SQL/VQL related reasoning. | |
| chat_llm : | |
| LLM client used primarily for conversational or explanatory responses such | |
| as natural language answers, explanations, and related questions. | |
| category_response : | |
| Parsed category information and filters derived from earlier classification | |
| steps, used to constrain VQL generation (for example, selected schema, | |
| metrics, or dimensions). | |
| auth : | |
| Authentication or authorization context associated with the request. Passed | |
| through to lower‑level helpers that may require user or tenant information. | |
| timings : dict | |
| Mutable dictionary used to accumulate timing information for different | |
| stages of processing. This generator updates the ``"llm_time"`` entry and | |
| any other timing keys used by helper functions. | |
| session_id : Optional[str], default None | |
| Identifier for the current session / conversation, forwarded to LLM | |
| backends and logging or tracing utilities. | |
| sample_data : | |
| Optional sample data or configuration used to augment vector search or | |
| improve query generation quality. | |
| Yields | |
| ------ | |
| dict | |
| A streaming event represented as a dictionary with at least: | |
| * ``"type"``: A string describing the kind of event (for example, | |
| ``"result"`` for the final structured response). | |
| * ``"data"``: The associated payload for that event. For a final | |
| ``"result"`` event this is typically the complete response object | |
| including query information, answer text, related questions, | |
| tokens, timings, and any generated graph data. | |
| Notes | |
| ----- | |
| * The generator may yield multiple events over the lifetime of a single | |
| request. Consumers should iterate until completion rather than assuming | |
| a single value. | |
| * When no valid VQL can be generated, the generator yields a final | |
| ``"result"`` event whose ``"data"`` includes an explanatory message in | |
| the ``"answer"`` field. | |
| * Timing information in ``timings`` is updated as processing progresses; | |
| the final ``"result"`` reflects the accumulated timings at the end of | |
| the pipeline. | |
| """ |
|
|
||
|
|
||
| # async generator | ||
| async def async_gen_process_sql_category(request, vector_search_tables, sql_gen_llm, chat_llm, category_response, auth, timings, session_id = None, sample_data = None): |
There was a problem hiding this comment.
Inconsistent spacing in function parameter list. The parameters 'session_id = None' and 'sample_data = None' have spaces around the equals sign, but PEP 8 style guide recommends no spaces around equals signs for default parameter values. This should be 'session_id=None' and 'sample_data=None' to follow Python conventions and be consistent with standard formatting.
| c_data = chunk.get("data") | ||
|
|
||
| if event == "query_gen": | ||
| # print(c_data) |
There was a problem hiding this comment.
Commented-out print statement should be removed. Debug code like '# print(c_data)' should not be committed to the codebase. Either remove it entirely or replace it with proper logging if the information is needed for debugging.
| # print(c_data) |
|
|
||
|
|
There was a problem hiding this comment.
The early exit path when no VQL query is generated does not return after yielding the result. This causes the code to continue executing and call query_fixer with a None/empty vql_query, which will likely cause an error. Add a return statement after the yield on line 405 to properly exit the function.
| return |
| @router.get( # SSE response | ||
| '/answerQuestion/stream', | ||
| response_class = StreamingResponse, | ||
| tags = ['Ask a Question'] | ||
| ) | ||
| @handle_endpoint_error("answerQuestion") | ||
| async def answer_question_stream( | ||
| request: answerQuestionRequest = Query(), |
There was a problem hiding this comment.
The function uses GET method for a streaming endpoint that processes questions. GET requests typically don't support request bodies, but the code expects answerQuestionRequest from Query parameters. Complex objects like answerQuestionRequest are not well-suited for query parameters. Consider using POST method instead, which is more appropriate for sending structured data and processing operations.
| async def async_gen_process_sql_category(request, vector_search_tables, sql_gen_llm, chat_llm, category_response, auth, timings, session_id = None, sample_data = None): | ||
| with timing_context("llm_time", timings): | ||
| vql_query, query_explanation, query_to_vql_tokens = await sdk_ai_tools.query_to_vql( | ||
| query=request.question, | ||
| vector_search_tables=vector_search_tables, | ||
| llm=sql_gen_llm, | ||
| filter_params=category_response, | ||
| custom_instructions=request.custom_instructions, | ||
| vector_search_sample_data_k=request.vector_search_sample_data_k, | ||
| session_id=session_id, | ||
| sample_data=sample_data | ||
| ) | ||
|
|
||
| # Early exit if no valid VQL could be generated | ||
| if not vql_query: | ||
| response = prepare_response( | ||
| vql_query='', | ||
| query_explanation=query_explanation, | ||
| tokens=query_to_vql_tokens, | ||
| execution_result={}, | ||
| vector_search_tables=vector_search_tables, | ||
| raw_graph='', | ||
| timings=timings | ||
| ) | ||
| response['answer'] = 'No VQL query was generated because no relevant schema was found.' | ||
| yield {"type": "result", "data": response} | ||
|
|
||
|
|
||
| vql_query, _, query_fixer_tokens = await sdk_ai_tools.query_fixer( | ||
| question=request.question, | ||
| query=vql_query, | ||
| query_explanation=query_explanation, | ||
| llm=sql_gen_llm, | ||
| session_id=session_id, | ||
| vector_search_sample_data_k=request.vector_search_sample_data_k, | ||
| vector_search_tables=vector_search_tables, | ||
| sample_data=sample_data | ||
| ) | ||
|
|
||
|
|
||
| max_attempts = 2 | ||
| attempt = 0 | ||
| fixer_history = [] | ||
| original_vql_query = vql_query | ||
|
|
||
| # return chunk ( generated vql ) | ||
| yield {"type": "query_gen", "data": original_vql_query} | ||
|
|
||
| while attempt < max_attempts: | ||
| vql_query, execution_result, vql_status_code, timings, fixer_history, query_fixer_tokens = await attempt_query_execution( | ||
| vql_query=vql_query, | ||
| request=request, | ||
| auth=auth, | ||
| timings=timings, | ||
| vector_search_tables=vector_search_tables, | ||
| session_id=session_id, | ||
| query_explanation=query_explanation, | ||
| query_fixer_tokens=query_fixer_tokens, | ||
| fixer_history=fixer_history, | ||
| sample_data=sample_data, | ||
| llm=sql_gen_llm | ||
| ) | ||
|
|
||
| if attempt == 0: | ||
| original_execution_result = execution_result | ||
| original_vql_status_code = vql_status_code | ||
|
|
||
| if vql_query == 'OK': | ||
| vql_query = original_vql_query | ||
| break | ||
| elif vql_status_code not in [499, 500]: | ||
| break | ||
|
|
||
| attempt += 1 | ||
|
|
||
| if vql_status_code in [499, 500]: | ||
| if vql_query: | ||
| execution_result, vql_status_code, timings = await execute_query( | ||
| vql_query=vql_query, | ||
| auth=auth, | ||
| limit=request.vql_execute_rows_limit, | ||
| timings=timings | ||
| ) | ||
| if vql_status_code == 500 or (vql_status_code == 499 and original_vql_status_code == 499): | ||
| vql_query = original_vql_query | ||
| execution_result = original_execution_result | ||
| vql_status_code = original_vql_status_code | ||
|
|
||
| else: | ||
| vql_status_code = 500 | ||
| execution_result = "No VQL query was generated." | ||
|
|
||
| llm_execution_result = prepare_execution_result( | ||
| execution_result=execution_result, | ||
| llm_response_rows_limit=request.llm_response_rows_limit, | ||
| vql_status_code=vql_status_code | ||
| ) | ||
|
|
||
| raw_graph, plot_data, request = handle_plotting(request=request, execution_result=execution_result) | ||
|
|
||
| response = prepare_response( | ||
| vql_query=vql_query, | ||
| query_explanation=query_explanation, | ||
| tokens=add_tokens(query_to_vql_tokens, query_fixer_tokens), | ||
| execution_result=execution_result if vql_status_code == 200 else {}, | ||
| vector_search_tables=vector_search_tables, | ||
| raw_graph=raw_graph, | ||
| timings=timings | ||
| ) | ||
|
|
||
| if request.verbose or request.plot: | ||
| response = await enhance_verbose_response( | ||
| request=request, | ||
| response=response, | ||
| vql_query=vql_query, | ||
| llm_execution_result=llm_execution_result, | ||
| vector_search_tables=vector_search_tables, | ||
| plot_data=plot_data, | ||
| timings=timings, | ||
| session_id=session_id, | ||
| sample_data=sample_data, | ||
| chat_llm=chat_llm, | ||
| sql_gen_llm=sql_gen_llm | ||
| ) | ||
|
|
||
| if request.disclaimer: | ||
| response['answer'] += "\n\nDISCLAIMER: This response has been generated based on an LLM's interpretation of the data and may not be accurate." | ||
|
|
||
| # return response | ||
|
|
||
| # return final chunk | ||
| yield {"type": "result", "data": response} |
There was a problem hiding this comment.
There is significant code duplication between async_gen_process_sql_category and the existing process_sql_category function. Both functions share almost identical logic for VQL generation, query fixing, execution, and response preparation. Consider refactoring to extract the shared logic into helper functions or making the streaming behavior an optional parameter to avoid maintaining two separate implementations of the same logic.
| if request.disclaimer: | ||
| response['answer'] += "\n\nDISCLAIMER: This response has been generated based on an LLM's interpretation of the data and may not be accurate." | ||
|
|
||
| # return response |
There was a problem hiding this comment.
Commented-out code should be removed. The comment '# return response' on line 508 is leftover code that serves no purpose and should be deleted. Comments should explain why, not repeat what the code does or contain unused code.
| # return response |
| with timing_context("llm_time", timings): | ||
| category, category_response, category_related_questions, sql_category_tokens = await sdk_ai_tools.sql_category( | ||
| query=request_data.question, | ||
| vector_search_tables=vector_search_tables, | ||
| llm=llm, | ||
| mode=request_data.mode, | ||
| custom_instructions=request_data.custom_instructions, | ||
| session_id=session_id | ||
| ) | ||
|
|
||
| if category == "SQL": | ||
| async for chunk in sdk_answer_question.async_gen_process_sql_category( | ||
| request=request_data, | ||
| vector_search_tables=vector_search_tables, | ||
| category_response=category_response, | ||
| auth=auth, | ||
| timings=timings, | ||
| session_id=session_id, | ||
| sample_data=sample_data, | ||
| chat_llm=llm, | ||
| sql_gen_llm=llm | ||
| ): | ||
| event = chunk.get("type") | ||
| c_data = chunk.get("data") | ||
|
|
||
| if event == "query_gen": | ||
| # print(c_data) | ||
| vql_progress_data = { | ||
| "status": "vql_generation_completed", | ||
| "result": { | ||
| "answer": c_data, | ||
| "message": "VQL generation completed" | ||
| } | ||
| } | ||
| yield f"data: {json.dumps(vql_progress_data)}\n\n" | ||
| elif event == "result": | ||
| response = c_data | ||
|
|
||
| response['tokens'] = add_tokens(response['tokens'], sql_category_tokens) | ||
| elif category == "METADATA": | ||
| response = sdk_answer_question.process_metadata_category( | ||
| category_response=category_response, | ||
| category_related_questions=category_related_questions, | ||
| vector_search_tables=vector_search_tables, | ||
| timings=timings, | ||
| tokens=sql_category_tokens, | ||
| disclaimer=request_data.disclaimer | ||
| ) | ||
| else: | ||
| response = sdk_answer_question.process_unknown_category(timings=timings) | ||
|
|
||
| response['llm_provider'] = request_data.llm_provider | ||
| response['llm_model'] = request_data.llm_model | ||
|
|
||
| # Send final completion message | ||
| final_data = { | ||
| "status": "completed", | ||
| "result": jsonable_encoder(response) | ||
| } | ||
| yield f"data: {json.dumps(final_data)}\n\n" |
There was a problem hiding this comment.
Missing error handling for exceptions that could occur after the initial try-except block. Operations like sql_category, async_gen_process_sql_category, process_metadata_category, and process_unknown_category could raise exceptions, but there's no try-except to catch them and yield an appropriate error message to the SSE client. In a streaming context, unhandled exceptions will abruptly terminate the connection. Consider wrapping the main processing logic in a try-except block and yielding error messages in SSE format when exceptions occur.
| with timing_context("llm_time", timings): | |
| category, category_response, category_related_questions, sql_category_tokens = await sdk_ai_tools.sql_category( | |
| query=request_data.question, | |
| vector_search_tables=vector_search_tables, | |
| llm=llm, | |
| mode=request_data.mode, | |
| custom_instructions=request_data.custom_instructions, | |
| session_id=session_id | |
| ) | |
| if category == "SQL": | |
| async for chunk in sdk_answer_question.async_gen_process_sql_category( | |
| request=request_data, | |
| vector_search_tables=vector_search_tables, | |
| category_response=category_response, | |
| auth=auth, | |
| timings=timings, | |
| session_id=session_id, | |
| sample_data=sample_data, | |
| chat_llm=llm, | |
| sql_gen_llm=llm | |
| ): | |
| event = chunk.get("type") | |
| c_data = chunk.get("data") | |
| if event == "query_gen": | |
| # print(c_data) | |
| vql_progress_data = { | |
| "status": "vql_generation_completed", | |
| "result": { | |
| "answer": c_data, | |
| "message": "VQL generation completed" | |
| } | |
| } | |
| yield f"data: {json.dumps(vql_progress_data)}\n\n" | |
| elif event == "result": | |
| response = c_data | |
| response['tokens'] = add_tokens(response['tokens'], sql_category_tokens) | |
| elif category == "METADATA": | |
| response = sdk_answer_question.process_metadata_category( | |
| category_response=category_response, | |
| category_related_questions=category_related_questions, | |
| vector_search_tables=vector_search_tables, | |
| timings=timings, | |
| tokens=sql_category_tokens, | |
| disclaimer=request_data.disclaimer | |
| ) | |
| else: | |
| response = sdk_answer_question.process_unknown_category(timings=timings) | |
| response['llm_provider'] = request_data.llm_provider | |
| response['llm_model'] = request_data.llm_model | |
| # Send final completion message | |
| final_data = { | |
| "status": "completed", | |
| "result": jsonable_encoder(response) | |
| } | |
| yield f"data: {json.dumps(final_data)}\n\n" | |
| try: | |
| with timing_context("llm_time", timings): | |
| category, category_response, category_related_questions, sql_category_tokens = await sdk_ai_tools.sql_category( | |
| query=request_data.question, | |
| vector_search_tables=vector_search_tables, | |
| llm=llm, | |
| mode=request_data.mode, | |
| custom_instructions=request_data.custom_instructions, | |
| session_id=session_id | |
| ) | |
| if category == "SQL": | |
| async for chunk in sdk_answer_question.async_gen_process_sql_category( | |
| request=request_data, | |
| vector_search_tables=vector_search_tables, | |
| category_response=category_response, | |
| auth=auth, | |
| timings=timings, | |
| session_id=session_id, | |
| sample_data=sample_data, | |
| chat_llm=llm, | |
| sql_gen_llm=llm | |
| ): | |
| event = chunk.get("type") | |
| c_data = chunk.get("data") | |
| if event == "query_gen": | |
| # print(c_data) | |
| vql_progress_data = { | |
| "status": "vql_generation_completed", | |
| "result": { | |
| "answer": c_data, | |
| "message": "VQL generation completed" | |
| } | |
| } | |
| yield f"data: {json.dumps(vql_progress_data)}\n\n" | |
| elif event == "result": | |
| response = c_data | |
| response['tokens'] = add_tokens(response['tokens'], sql_category_tokens) | |
| elif category == "METADATA": | |
| response = sdk_answer_question.process_metadata_category( | |
| category_response=category_response, | |
| category_related_questions=category_related_questions, | |
| vector_search_tables=vector_search_tables, | |
| timings=timings, | |
| tokens=sql_category_tokens, | |
| disclaimer=request_data.disclaimer | |
| ) | |
| else: | |
| response = sdk_answer_question.process_unknown_category(timings=timings) | |
| response['llm_provider'] = request_data.llm_provider | |
| response['llm_model'] = request_data.llm_model | |
| # Send final completion message | |
| final_data = { | |
| "status": "completed", | |
| "result": jsonable_encoder(response) | |
| } | |
| yield f"data: {json.dumps(final_data)}\n\n" | |
| except Exception as e: | |
| # Log the exception and send an SSE-formatted error message | |
| logging.exception("Error while processing SSE question") | |
| error_data = { | |
| "status": "error", | |
| "error": { | |
| "message": str(e), | |
| "type": e.__class__.__name__, | |
| }, | |
| } | |
| yield f"data: {json.dumps(error_data)}\n\n" |
| if not vector_search_tables: | ||
| error_data = {"error": "The vector search result returned 0 views. This could be due to limited permissions or an empty vector store."} | ||
| yield f"data: {json.dumps(error_data)}\n\n" | ||
| return |
There was a problem hiding this comment.
When vector_search_tables is empty, the function yields an error message but doesn't include a status field in the JSON. This is inconsistent with other SSE messages which include a "status" field (e.g., "vector_search_completed", "completed"). Consider adding a status field like "error" for consistency and easier client-side parsing.
| if category == "SQL": | ||
| async for chunk in sdk_answer_question.async_gen_process_sql_category( | ||
| request=request_data, | ||
| vector_search_tables=vector_search_tables, | ||
| category_response=category_response, | ||
| auth=auth, | ||
| timings=timings, | ||
| session_id=session_id, | ||
| sample_data=sample_data, | ||
| chat_llm=llm, | ||
| sql_gen_llm=llm | ||
| ): | ||
| event = chunk.get("type") | ||
| c_data = chunk.get("data") | ||
|
|
||
| if event == "query_gen": | ||
| # print(c_data) | ||
| vql_progress_data = { | ||
| "status": "vql_generation_completed", | ||
| "result": { | ||
| "answer": c_data, | ||
| "message": "VQL generation completed" | ||
| } | ||
| } | ||
| yield f"data: {json.dumps(vql_progress_data)}\n\n" | ||
| elif event == "result": | ||
| response = c_data | ||
|
|
||
| response['tokens'] = add_tokens(response['tokens'], sql_category_tokens) | ||
| elif category == "METADATA": | ||
| response = sdk_answer_question.process_metadata_category( | ||
| category_response=category_response, | ||
| category_related_questions=category_related_questions, | ||
| vector_search_tables=vector_search_tables, | ||
| timings=timings, | ||
| tokens=sql_category_tokens, | ||
| disclaimer=request_data.disclaimer | ||
| ) | ||
| else: | ||
| response = sdk_answer_question.process_unknown_category(timings=timings) |
There was a problem hiding this comment.
The variable 'response' is used before being assigned when category is "METADATA" or unknown. The response variable is only set inside the if statement for category == "SQL", but is then accessed unconditionally at line 404 and later lines. This will cause an UnboundLocalError if the category is not "SQL". Initialize the response variable before the conditional logic or handle all category cases properly.
cbfdaa5 to
160cdfa
Compare
Summary
Some VDB queries in the RAG pipeline can take a long time, and users had no visibility into whether their requests were still being processed.
This PR introduces SSE-based streaming to make the request state visible during long-running operations.
Changes
Impact