Skip to content

Commit 6fc702d

Browse files
committed
fix event compaction to nonblocking
1 parent 585ebfd commit 6fc702d

2 files changed

Lines changed: 431 additions & 9 deletions

File tree

src/google/adk/runners.py

Lines changed: 69 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,11 @@ class Runner:
118118
resumability_config: The resumability config for the application.
119119
"""
120120

121+
# Semaphore to limit concurrent event compaction tasks to prevent resource
122+
# exhaustion under high concurrency. Limits concurrent LLM calls and DB writes.
123+
# Shared across all Runner instances for global concurrency control.
124+
_compaction_semaphore: Optional[asyncio.Semaphore] = None
125+
121126
app_name: str
122127
"""The app name of the runner."""
123128
agent: BaseAgent
@@ -150,6 +155,7 @@ def __init__(
150155
credential_service: Optional[BaseCredentialService] = None,
151156
plugin_close_timeout: float = 5.0,
152157
auto_create_session: bool = False,
158+
max_concurrent_compactions: int = 10,
153159
):
154160
"""Initializes the Runner.
155161
@@ -179,6 +185,11 @@ def __init__(
179185
auto_create_session: Whether to automatically create a session when
180186
not found. Defaults to False. If False, a missing session raises
181187
ValueError with a helpful message.
188+
max_concurrent_compactions: Maximum number of concurrent event
189+
compaction tasks allowed. Defaults to 10. This limit is shared across
190+
all Runner instances to prevent resource exhaustion. Higher values
191+
allow more concurrent compactions but consume more resources (LLM
192+
API calls, database connections).
182193
183194
Raises:
184195
ValueError: If `app` is provided along with `agent` or `plugins`, or if
@@ -206,6 +217,8 @@ def __init__(
206217
) = self._infer_agent_origin(self.agent)
207218
self._app_name_alignment_hint: Optional[str] = None
208219
self._enforce_app_name_alignment()
220+
# Initialize or update the shared compaction semaphore
221+
self._initialize_compaction_semaphore(max_concurrent_compactions)
209222

210223
def _validate_runner_params(
211224
self,
@@ -320,6 +333,27 @@ def _infer_agent_origin(
320333
return None, origin_dir
321334
return origin_name, origin_dir
322335

336+
@classmethod
337+
def _initialize_compaction_semaphore(cls, limit: int) -> None:
338+
"""Initializes or updates the shared compaction semaphore.
339+
340+
This method ensures the class-level semaphore is initialized with the
341+
specified limit. If a semaphore already exists, it creates a new one
342+
with the updated limit (the old one will be garbage collected once all
343+
pending tasks complete).
344+
345+
Args:
346+
limit: Maximum number of concurrent compaction tasks allowed.
347+
"""
348+
if limit <= 0:
349+
raise ValueError(
350+
f'max_concurrent_compactions must be positive, got {limit}'
351+
)
352+
# Note: We can't use async lock here since this is called from __init__.
353+
# The semaphore creation itself is thread-safe, and in practice Runner
354+
# instances are created in the same event loop, so this is safe.
355+
cls._compaction_semaphore = asyncio.Semaphore(limit)
356+
323357
def _enforce_app_name_alignment(self) -> None:
324358
origin_name = self._agent_origin_app_name
325359
origin_dir = self._agent_origin_dir
@@ -401,8 +435,8 @@ def run(
401435
402436
If event compaction is enabled in the App configuration, it will be
403437
performed after all agent events for the current invocation have been
404-
yielded. The generator will only finish iterating after event
405-
compaction is complete.
438+
yielded. Compaction runs as a background task and does not block the
439+
generator from completing.
406440
407441
Args:
408442
user_id: The user ID of the session.
@@ -464,9 +498,10 @@ async def run_async(
464498
465499
If event compaction is enabled in the App configuration, it will be
466500
performed after all agent events for the current invocation have been
467-
yielded. The async generator will only finish iterating after event
468-
compaction is complete. However, this does not block new `run_async`
469-
calls for subsequent user queries, which can be started concurrently.
501+
yielded. Compaction runs as a background task and does not block the
502+
generator from completing, allowing the frontend to receive responses
503+
without delay. However, this does not block new `run_async` calls for
504+
subsequent user queries, which can be started concurrently.
470505
471506
Args:
472507
user_id: The user ID of the session.
@@ -552,11 +587,36 @@ async def execute(ctx: InvocationContext) -> AsyncGenerator[Event]:
552587
# Run compaction after all events are yielded from the agent.
553588
# (We don't compact in the middle of an invocation, we only compact at
554589
# the end of an invocation.)
590+
# Run compaction as a background task to avoid blocking the generator
591+
# completion, which causes delays on the frontend. Use a semaphore to
592+
# limit concurrent compactions and prevent resource exhaustion under
593+
# high concurrency.
555594
if self.app and self.app.events_compaction_config:
556-
logger.debug('Running event compactor.')
557-
await _run_compaction_for_sliding_window(
558-
self.app, session, self.session_service
559-
)
595+
logger.debug('Scheduling event compactor in background.')
596+
597+
async def _run_compaction_with_error_handling():
598+
try:
599+
# Ensure semaphore is initialized (should always be after __init__)
600+
if self._compaction_semaphore is None:
601+
logger.warning(
602+
'Compaction semaphore not initialized, using default limit.'
603+
)
604+
self._initialize_compaction_semaphore(10)
605+
async with self._compaction_semaphore:
606+
await _run_compaction_for_sliding_window(
607+
self.app, session, self.session_service
608+
)
609+
except asyncio.CancelledError:
610+
logger.debug('Event compaction cancelled.')
611+
raise
612+
except Exception as e:
613+
logger.error(
614+
'Event compaction failed but not blocking response: %s',
615+
e,
616+
exc_info=True,
617+
)
618+
619+
asyncio.create_task(_run_compaction_with_error_handling())
560620

561621
async with Aclosing(_run_with_trace(new_message, invocation_id)) as agen:
562622
async for event in agen:

0 commit comments

Comments
 (0)