From e91061aa6a6ef5cef03169237ba047738536e01b Mon Sep 17 00:00:00 2001 From: Benjamin <18634262+beplay@users.noreply.github.com> Date: Tue, 30 Apr 2024 11:03:28 +0000 Subject: [PATCH 1/3] Fix unclosed client session error Signed-off-by: Benjamin <18634262+beplay@users.noreply.github.com> --- app.py | 116 +++++++++++++++++++++++++---------------------- requirements.txt | 1 + 2 files changed, 62 insertions(+), 55 deletions(-) diff --git a/app.py b/app.py index 9d2d32d1fd..9b12660fac 100644 --- a/app.py +++ b/app.py @@ -5,6 +5,7 @@ import uuid from dotenv import load_dotenv import httpx +import asyncio from quart import ( Blueprint, Quart, @@ -13,6 +14,7 @@ request, send_from_directory, render_template, + current_app, ) from openai import AsyncAzureOpenAI @@ -49,11 +51,23 @@ UI_FAVICON = os.environ.get("UI_FAVICON") or "/favicon.ico" UI_SHOW_SHARE_BUTTON = os.environ.get("UI_SHOW_SHARE_BUTTON", "true").lower() == "true" +cosmos_db_ready = asyncio.Event() def create_app(): app = Quart(__name__) app.register_blueprint(bp) app.config["TEMPLATES_AUTO_RELOAD"] = True + + @app.before_serving + async def init(): + try: + app.cosmos_conversation_client = await init_cosmosdb_client() + cosmos_db_ready.set() + except Exception as e: + logging.exception("Failed to initialize CosmosDB client") + app.cosmos_conversation_client = None + raise e + return app @@ -308,7 +322,7 @@ def should_use_data(): # Initialize Azure OpenAI Client -def init_openai_client(use_data=SHOULD_USE_DATA): +async def init_openai_client(use_data=SHOULD_USE_DATA): azure_openai_client = None try: # API version check @@ -337,9 +351,11 @@ def init_openai_client(use_data=SHOULD_USE_DATA): ad_token_provider = None if not aoai_api_key: logging.debug("No AZURE_OPENAI_KEY found, using Azure AD auth") - ad_token_provider = get_bearer_token_provider( - DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default" - ) + async with DefaultAzureCredential() as credential: + ad_token_provider = get_bearer_token_provider( + credential, + "https://cognitiveservices.azure.com/.default" + ) # Deployment deployment = AZURE_OPENAI_MODEL @@ -364,8 +380,7 @@ def init_openai_client(use_data=SHOULD_USE_DATA): raise e -def init_cosmosdb_client(): - cosmos_conversation_client = None +async def init_cosmosdb_client(): if CHAT_HISTORY_ENABLED: try: cosmos_endpoint = ( @@ -373,7 +388,8 @@ def init_cosmosdb_client(): ) if not AZURE_COSMOSDB_ACCOUNT_KEY: - credential = DefaultAzureCredential() + async with DefaultAzureCredential() as cred: + credential = cred else: credential = AZURE_COSMOSDB_ACCOUNT_KEY @@ -833,7 +849,7 @@ async def send_chat_request(request): model_args = prepare_model_args(request) try: - azure_openai_client = init_openai_client() + azure_openai_client = await init_openai_client() raw_response = await azure_openai_client.chat.completions.with_raw_response.create(**model_args) response = raw_response.parse() apim_request_id = raw_response.headers.get("apim-request-id") @@ -909,6 +925,7 @@ def get_frontend_settings(): ## Conversation History API ## @bp.route("/history/generate", methods=["POST"]) async def add_conversation(): + await cosmos_db_ready.wait() authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] @@ -918,15 +935,14 @@ async def add_conversation(): try: # make sure cosmos is configured - cosmos_conversation_client = init_cosmosdb_client() - if not cosmos_conversation_client: + if not current_app.cosmos_conversation_client: raise Exception("CosmosDB is not configured or not working") # check for the conversation_id, if the conversation is not set, we will create a new one history_metadata = {} if not conversation_id: title = await generate_title(request_json["messages"]) - conversation_dict = await cosmos_conversation_client.create_conversation( + conversation_dict = await current_app.cosmos_conversation_client.create_conversation( user_id=user_id, title=title ) conversation_id = conversation_dict["id"] @@ -937,7 +953,7 @@ async def add_conversation(): ## then write it to the conversation history in cosmos messages = request_json["messages"] if len(messages) > 0 and messages[-1]["role"] == "user": - createdMessageValue = await cosmos_conversation_client.create_message( + createdMessageValue = await current_app.cosmos_conversation_client.create_message( uuid=str(uuid.uuid4()), conversation_id=conversation_id, user_id=user_id, @@ -952,8 +968,6 @@ async def add_conversation(): else: raise Exception("No user message found") - await cosmos_conversation_client.cosmosdb_client.close() - # Submit request to Chat Completions for response request_body = await request.get_json() history_metadata["conversation_id"] = conversation_id @@ -967,6 +981,7 @@ async def add_conversation(): @bp.route("/history/update", methods=["POST"]) async def update_conversation(): + await cosmos_db_ready.wait() authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] @@ -976,8 +991,7 @@ async def update_conversation(): try: # make sure cosmos is configured - cosmos_conversation_client = init_cosmosdb_client() - if not cosmos_conversation_client: + if not current_app.cosmos_conversation_client: raise Exception("CosmosDB is not configured or not working") # check for the conversation_id, if the conversation is not set, we will create a new one @@ -990,14 +1004,14 @@ async def update_conversation(): if len(messages) > 0 and messages[-1]["role"] == "assistant": if len(messages) > 1 and messages[-2].get("role", None) == "tool": # write the tool message first - await cosmos_conversation_client.create_message( + await current_app.cosmos_conversation_client.create_message( uuid=str(uuid.uuid4()), conversation_id=conversation_id, user_id=user_id, input_message=messages[-2], ) # write the assistant message - await cosmos_conversation_client.create_message( + await current_app.cosmos_conversation_client.create_message( uuid=messages[-1]["id"], conversation_id=conversation_id, user_id=user_id, @@ -1007,7 +1021,6 @@ async def update_conversation(): raise Exception("No bot messages found") # Submit request to Chat Completions for response - await cosmos_conversation_client.cosmosdb_client.close() response = {"success": True} return jsonify(response), 200 @@ -1018,9 +1031,9 @@ async def update_conversation(): @bp.route("/history/message_feedback", methods=["POST"]) async def update_message(): + await cosmos_db_ready.wait() authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] - cosmos_conversation_client = init_cosmosdb_client() ## check request for message_id request_json = await request.get_json() @@ -1034,7 +1047,7 @@ async def update_message(): return jsonify({"error": "message_feedback is required"}), 400 ## update the message in cosmos - updated_message = await cosmos_conversation_client.update_message_feedback( + updated_message = await current_app.cosmos_conversation_client.update_message_feedback( user_id, message_id, message_feedback ) if updated_message: @@ -1064,6 +1077,7 @@ async def update_message(): @bp.route("/history/delete", methods=["DELETE"]) async def delete_conversation(): + await cosmos_db_ready.wait() ## get the user id from the request headers authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] @@ -1077,22 +1091,19 @@ async def delete_conversation(): return jsonify({"error": "conversation_id is required"}), 400 ## make sure cosmos is configured - cosmos_conversation_client = init_cosmosdb_client() - if not cosmos_conversation_client: + if not current_app.cosmos_conversation_client: raise Exception("CosmosDB is not configured or not working") ## delete the conversation messages from cosmos first - deleted_messages = await cosmos_conversation_client.delete_messages( + deleted_messages = await current_app.cosmos_conversation_client.delete_messages( conversation_id, user_id ) ## Now delete the conversation - deleted_conversation = await cosmos_conversation_client.delete_conversation( + deleted_conversation = await current_app.cosmos_conversation_client.delete_conversation( user_id, conversation_id ) - await cosmos_conversation_client.cosmosdb_client.close() - return ( jsonify( { @@ -1109,20 +1120,19 @@ async def delete_conversation(): @bp.route("/history/list", methods=["GET"]) async def list_conversations(): + await cosmos_db_ready.wait() offset = request.args.get("offset", 0) authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] ## make sure cosmos is configured - cosmos_conversation_client = init_cosmosdb_client() - if not cosmos_conversation_client: + if not current_app.cosmos_conversation_client: raise Exception("CosmosDB is not configured or not working") ## get the conversations from cosmos - conversations = await cosmos_conversation_client.get_conversations( + conversations = await current_app.cosmos_conversation_client.get_conversations( user_id, offset=offset, limit=25 ) - await cosmos_conversation_client.cosmosdb_client.close() if not isinstance(conversations, list): return jsonify({"error": f"No conversations for {user_id} were found"}), 404 @@ -1133,6 +1143,7 @@ async def list_conversations(): @bp.route("/history/read", methods=["POST"]) async def get_conversation(): + await cosmos_db_ready.wait() authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] @@ -1144,12 +1155,11 @@ async def get_conversation(): return jsonify({"error": "conversation_id is required"}), 400 ## make sure cosmos is configured - cosmos_conversation_client = init_cosmosdb_client() - if not cosmos_conversation_client: + if not current_app.cosmos_conversation_client: raise Exception("CosmosDB is not configured or not working") ## get the conversation object and the related messages from cosmos - conversation = await cosmos_conversation_client.get_conversation( + conversation = await current_app.cosmos_conversation_client.get_conversation( user_id, conversation_id ) ## return the conversation id and the messages in the bot frontend format @@ -1164,7 +1174,7 @@ async def get_conversation(): ) # get the messages for the conversation from cosmos - conversation_messages = await cosmos_conversation_client.get_messages( + conversation_messages = await current_app.cosmos_conversation_client.get_messages( user_id, conversation_id ) @@ -1180,12 +1190,12 @@ async def get_conversation(): for msg in conversation_messages ] - await cosmos_conversation_client.cosmosdb_client.close() return jsonify({"conversation_id": conversation_id, "messages": messages}), 200 @bp.route("/history/rename", methods=["POST"]) async def rename_conversation(): + await cosmos_db_ready.wait() authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] @@ -1197,12 +1207,11 @@ async def rename_conversation(): return jsonify({"error": "conversation_id is required"}), 400 ## make sure cosmos is configured - cosmos_conversation_client = init_cosmosdb_client() - if not cosmos_conversation_client: + if not current_app.cosmos_conversation_client: raise Exception("CosmosDB is not configured or not working") ## get the conversation from cosmos - conversation = await cosmos_conversation_client.get_conversation( + conversation = await current_app.cosmos_conversation_client.get_conversation( user_id, conversation_id ) if not conversation: @@ -1220,16 +1229,16 @@ async def rename_conversation(): if not title: return jsonify({"error": "title is required"}), 400 conversation["title"] = title - updated_conversation = await cosmos_conversation_client.upsert_conversation( + updated_conversation = await current_app.cosmos_conversation_client.upsert_conversation( conversation ) - await cosmos_conversation_client.cosmosdb_client.close() return jsonify(updated_conversation), 200 @bp.route("/history/delete_all", methods=["DELETE"]) async def delete_all_conversations(): + await cosmos_db_ready.wait() ## get the user id from the request headers authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] @@ -1237,11 +1246,10 @@ async def delete_all_conversations(): # get conversations for user try: ## make sure cosmos is configured - cosmos_conversation_client = init_cosmosdb_client() - if not cosmos_conversation_client: + if not current_app.cosmos_conversation_client: raise Exception("CosmosDB is not configured or not working") - conversations = await cosmos_conversation_client.get_conversations( + conversations = await current_app.cosmos_conversation_client.get_conversations( user_id, offset=0, limit=None ) if not conversations: @@ -1250,15 +1258,14 @@ async def delete_all_conversations(): # delete each conversation for conversation in conversations: ## delete the conversation messages from cosmos first - deleted_messages = await cosmos_conversation_client.delete_messages( + deleted_messages = await current_app.cosmos_conversation_client.delete_messages( conversation["id"], user_id ) ## Now delete the conversation - deleted_conversation = await cosmos_conversation_client.delete_conversation( + deleted_conversation = await current_app.cosmos_conversation_client.delete_conversation( user_id, conversation["id"] ) - await cosmos_conversation_client.cosmosdb_client.close() return ( jsonify( { @@ -1275,6 +1282,7 @@ async def delete_all_conversations(): @bp.route("/history/clear", methods=["POST"]) async def clear_messages(): + await cosmos_db_ready.wait() ## get the user id from the request headers authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] @@ -1288,12 +1296,11 @@ async def clear_messages(): return jsonify({"error": "conversation_id is required"}), 400 ## make sure cosmos is configured - cosmos_conversation_client = init_cosmosdb_client() - if not cosmos_conversation_client: + if not current_app.cosmos_conversation_client: raise Exception("CosmosDB is not configured or not working") ## delete the conversation messages from cosmos - deleted_messages = await cosmos_conversation_client.delete_messages( + deleted_messages = await current_app.cosmos_conversation_client.delete_messages( conversation_id, user_id ) @@ -1313,18 +1320,17 @@ async def clear_messages(): @bp.route("/history/ensure", methods=["GET"]) async def ensure_cosmos(): + await cosmos_db_ready.wait() if not AZURE_COSMOSDB_ACCOUNT: return jsonify({"error": "CosmosDB is not configured"}), 404 try: - cosmos_conversation_client = init_cosmosdb_client() - success, err = await cosmos_conversation_client.ensure() - if not cosmos_conversation_client or not success: + success, err = await current_app.cosmos_conversation_client.ensure() + if not current_app.cosmos_conversation_client or not success: if err: return jsonify({"error": err}), 422 return jsonify({"error": "CosmosDB is not configured or not working"}), 500 - await cosmos_conversation_client.cosmosdb_client.close() return jsonify({"message": "CosmosDB is configured and working"}), 200 except Exception as e: logging.exception("Exception in /history/ensure") @@ -1364,7 +1370,7 @@ async def generate_title(conversation_messages): messages.append({"role": "user", "content": title_prompt}) try: - azure_openai_client = init_openai_client(use_data=False) + azure_openai_client = await init_openai_client(use_data=False) response = await azure_openai_client.chat.completions.create( model=AZURE_OPENAI_MODEL, messages=messages, temperature=1, max_tokens=64 ) diff --git a/requirements.txt b/requirements.txt index 047642ce2e..0b05ac8619 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ quart==0.19.4 uvicorn==0.24.0 aiohttp==3.9.2 gunicorn==20.1.0 +asyncio==3.4.3 \ No newline at end of file From 900169bf01b86f727985145f03cebd165dbabe6e Mon Sep 17 00:00:00 2001 From: Benjamin <18634262+beplay@users.noreply.github.com> Date: Tue, 30 Apr 2024 14:19:56 +0000 Subject: [PATCH 2/3] Remove unnecessary import Signed-off-by: Benjamin <18634262+beplay@users.noreply.github.com> --- requirements.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index 0b05ac8619..6d883628ea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,5 +8,4 @@ azure-cosmos==4.5.0 quart==0.19.4 uvicorn==0.24.0 aiohttp==3.9.2 -gunicorn==20.1.0 -asyncio==3.4.3 \ No newline at end of file +gunicorn==20.1.0 \ No newline at end of file From 4d06074afeb5054a938a582b74deace1b5d4aae0 Mon Sep 17 00:00:00 2001 From: Benjamin <18634262+beplay@users.noreply.github.com> Date: Tue, 30 Apr 2024 16:12:40 +0000 Subject: [PATCH 3/3] Re-add assignment to prevent UnboundLocalError at app start Signed-off-by: Benjamin <18634262+beplay@users.noreply.github.com> --- app.py | 1 + 1 file changed, 1 insertion(+) diff --git a/app.py b/app.py index 9b12660fac..224fd624af 100644 --- a/app.py +++ b/app.py @@ -381,6 +381,7 @@ async def init_openai_client(use_data=SHOULD_USE_DATA): async def init_cosmosdb_client(): + cosmos_conversation_client = None if CHAT_HISTORY_ENABLED: try: cosmos_endpoint = (