Skip to content

Commit bdfddfa

Browse files
committed
fix: resolve agent framework compatibility and event loop conflicts
- Update get_agent usage to use app.get_agent() method (agent-framework-azurefunctions API change) - Fix event loop conflict in Cosmos DB client by using per-loop storage - Prevent 'attached to a different loop' runtime errors when agents call tools - Add asyncio import and _get_loop_id helper for loop-aware client management
1 parent 9d3f308 commit bdfddfa

File tree

2 files changed

+49
-37
lines changed

2 files changed

+49
-37
lines changed

src/data/cosmos_ops.py

Lines changed: 43 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import os
88
import logging
9+
import asyncio
910
from azure.cosmos.aio import CosmosClient
1011
from azure.cosmos import PartitionKey
1112
from azure.cosmos.exceptions import CosmosResourceNotFoundError
@@ -19,36 +20,45 @@
1920
COSMOS_CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME", "code-snippets")
2021
COSMOS_VECTOR_TOP_K = int(os.environ.get("COSMOS_VECTOR_TOP_K", "30"))
2122

22-
# Singleton references for client, database, and container caching
23-
# This ensures we reuse connections across calls
24-
_cosmos_client = None
25-
_database = None
26-
_container = None
23+
# Per-event-loop storage for client, database, and container
24+
# This prevents "attached to a different loop" errors when tools are called from agent framework
25+
_clients = {}
26+
_databases = {}
27+
_containers = {}
2728

28-
# Gets or creates the singleton Cosmos client, caching it for reuse
29+
def _get_loop_id():
30+
"""Get a unique identifier for the current event loop."""
31+
try:
32+
loop = asyncio.get_running_loop()
33+
return id(loop)
34+
except RuntimeError:
35+
return None
36+
37+
# Gets or creates the Cosmos client for the current event loop
2938
async def get_cosmos_client():
3039
"""
31-
Gets or creates the singleton Cosmos client.
40+
Gets or creates the Cosmos client for the current event loop.
41+
This prevents event loop conflicts when called from different async contexts.
3242
"""
33-
global _cosmos_client
34-
if _cosmos_client is None:
35-
logger.debug("Creating Cosmos client")
36-
_cosmos_client = CosmosClient(
43+
loop_id = _get_loop_id()
44+
if loop_id not in _clients:
45+
logger.debug(f"Creating Cosmos client for loop {loop_id}")
46+
_clients[loop_id] = CosmosClient(
3747
url=os.environ["COSMOS_ENDPOINT"],
3848
credential=DefaultAzureCredential()
3949
)
40-
return _cosmos_client
50+
return _clients[loop_id]
4151

42-
# Gets or creates the singleton Cosmos database reference
52+
# Gets or creates the Cosmos database reference for the current event loop
4353
async def get_database():
4454
"""
45-
Gets or creates the singleton database.
55+
Gets or creates the database for the current event loop.
4656
"""
47-
global _database
48-
if _database is None:
57+
loop_id = _get_loop_id()
58+
if loop_id not in _databases:
4959
client = await get_cosmos_client()
50-
_database = await client.create_database_if_not_exists(COSMOS_DATABASE_NAME)
51-
return _database
60+
_databases[loop_id] = await client.create_database_if_not_exists(COSMOS_DATABASE_NAME)
61+
return _databases[loop_id]
5262

5363
# Gets or creates the Cosmos DB container with proper partition key and vector index configuration
5464
# The container is set up with a partition on /name and a vectorEmbeddingPolicy on /embedding
@@ -67,16 +77,16 @@ async def get_container():
6777
Raises:
6878
Exception: If container creation or configuration fails
6979
"""
70-
global _container
71-
if _container is None:
80+
loop_id = _get_loop_id()
81+
if loop_id not in _containers:
7282
try:
7383
logger.info(f"Getting container '{COSMOS_CONTAINER_NAME}' from database '{COSMOS_DATABASE_NAME}'")
7484

7585
database = await get_database()
7686

7787
# Create container with vector index configuration
7888
logger.debug("Creating container with vector index configuration")
79-
_container = await database.create_container_if_not_exists(
89+
_containers[loop_id] = await database.create_container_if_not_exists(
8090
id=COSMOS_CONTAINER_NAME,
8191
partition_key=PartitionKey(path="/name"),
8292
indexing_policy={
@@ -109,20 +119,22 @@ async def get_container():
109119
except Exception as e:
110120
logger.error(f"Error configuring Cosmos container: {str(e)}", exc_info=True)
111121
raise
112-
return _container
122+
return _containers[loop_id]
113123

114-
# Closes all Cosmos DB connections and resets cached client, database, and container
124+
# Closes all Cosmos DB connections for the current event loop
115125
async def close_connections():
116126
"""
117-
Closes all Cosmos DB connections.
127+
Closes Cosmos DB connections for the current event loop.
118128
"""
119-
global _cosmos_client, _database, _container
120-
if _cosmos_client is not None:
121-
await _cosmos_client.close()
122-
_cosmos_client = None
123-
_database = None
124-
_container = None
125-
logger.info("Closed Cosmos DB connections")
129+
loop_id = _get_loop_id()
130+
if loop_id in _clients:
131+
await _clients[loop_id].close()
132+
del _clients[loop_id]
133+
if loop_id in _databases:
134+
del _databases[loop_id]
135+
if loop_id in _containers:
136+
del _containers[loop_id]
137+
logger.info(f"Closed Cosmos DB connections for loop {loop_id}")
126138

127139
# Upserts a document into Cosmos DB with vector embeddings
128140
# The document includes id, name, projectId, code, type, and embedding fields

src/durable_agents.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import azure.functions as func
3232
import azure.durable_functions as df
3333
from azure.durable_functions import DurableOrchestrationContext
34-
from agent_framework_azurefunctions import AgentFunctionApp, get_agent
34+
from agent_framework_azurefunctions import AgentFunctionApp
3535
from agent_framework import ChatAgent # Microsoft Agent Framework
3636
from agent_framework.azure import AzureOpenAIChatClient # Azure OpenAI Chat client
3737
from azure.identity import DefaultAzureCredential # For RBAC authentication
@@ -281,9 +281,9 @@ def documentation_orchestration(context: DurableOrchestrationContext):
281281
Orchestration that generates comprehensive documentation by chaining agent calls.
282282
283283
This demonstrates:
284-
1. Getting an agent wrapper using context.get_agent()
285-
2. Creating a new conversation thread
286-
3. Making sequential agent calls with shared thread context
284+
1. Getting an agent wrapper using app.get_agent()
285+
2. Making sequential agent calls that share conversation context
286+
3. Coordinating multiple agents in a single orchestration
287287
4. Each call builds on the previous response
288288
289289
Example workflow:
@@ -308,7 +308,7 @@ def documentation_orchestration(context: DurableOrchestrationContext):
308308
user_query = "Generate comprehensive documentation"
309309

310310
# Get the DeepWiki agent wrapper for orchestration use
311-
deep_wiki = get_agent(context, "DeepWikiAgent")
311+
deep_wiki = app.get_agent(context, "DeepWikiAgent")
312312

313313
# First agent call: Generate initial wiki documentation
314314
initial_wiki = yield deep_wiki.run(
@@ -321,7 +321,7 @@ def documentation_orchestration(context: DurableOrchestrationContext):
321321
)
322322

323323
# Get the CodeStyle agent for generating complementary style guide
324-
code_style = get_agent(context, "CodeStyleAgent")
324+
code_style = app.get_agent(context, "CodeStyleAgent")
325325

326326
# Third agent call: Generate style guide that complements the wiki
327327
style_guide = yield code_style.run(

0 commit comments

Comments
 (0)