|
| 1 | +from __future__ import annotations |
| 2 | + |
| 3 | +import logging |
| 4 | +from typing import TYPE_CHECKING, Any, Callable |
| 5 | + |
| 6 | +from openai import AsyncOpenAI |
| 7 | + |
| 8 | +from ..models._openai_shared import get_default_openai_client |
| 9 | +from .openai_conversations_session import OpenAIConversationsSession |
| 10 | +from .session import ( |
| 11 | + OpenAIResponsesCompactionArgs, |
| 12 | + OpenAIResponsesCompactionAwareSession, |
| 13 | + SessionABC, |
| 14 | +) |
| 15 | + |
| 16 | +if TYPE_CHECKING: |
| 17 | + from ..items import TResponseInputItem |
| 18 | + from .session import Session |
| 19 | + |
| 20 | +logger = logging.getLogger("openai-agents.openai.compaction") |
| 21 | + |
| 22 | +DEFAULT_COMPACTION_THRESHOLD = 10 |
| 23 | + |
| 24 | + |
| 25 | +def select_compaction_candidate_items( |
| 26 | + items: list[TResponseInputItem], |
| 27 | +) -> list[TResponseInputItem]: |
| 28 | + """Select compaction candidate items. |
| 29 | +
|
| 30 | + Excludes user messages and compaction items. |
| 31 | + """ |
| 32 | + |
| 33 | + def _is_user_message(item: TResponseInputItem) -> bool: |
| 34 | + if not isinstance(item, dict): |
| 35 | + return False |
| 36 | + if item.get("type") == "message": |
| 37 | + return item.get("role") == "user" |
| 38 | + return item.get("role") == "user" and "content" in item |
| 39 | + |
| 40 | + return [ |
| 41 | + item |
| 42 | + for item in items |
| 43 | + if not ( |
| 44 | + _is_user_message(item) or (isinstance(item, dict) and item.get("type") == "compaction") |
| 45 | + ) |
| 46 | + ] |
| 47 | + |
| 48 | + |
| 49 | +def default_should_trigger_compaction(context: dict[str, Any]) -> bool: |
| 50 | + """Default decision: compact when >= 10 candidate items exist.""" |
| 51 | + return len(context["compaction_candidate_items"]) >= DEFAULT_COMPACTION_THRESHOLD |
| 52 | + |
| 53 | + |
| 54 | +def is_openai_model_name(model: str) -> bool: |
| 55 | + """Validate model name follows OpenAI conventions.""" |
| 56 | + trimmed = model.strip() |
| 57 | + if not trimmed: |
| 58 | + return False |
| 59 | + |
| 60 | + # Handle fine-tuned models: ft:gpt-4.1:org:proj:suffix |
| 61 | + without_ft_prefix = trimmed[3:] if trimmed.startswith("ft:") else trimmed |
| 62 | + root = without_ft_prefix.split(":", 1)[0] |
| 63 | + |
| 64 | + # Allow gpt-* and o* models |
| 65 | + if root.startswith("gpt-"): |
| 66 | + return True |
| 67 | + if root.startswith("o") and root[1:2].isdigit(): |
| 68 | + return True |
| 69 | + |
| 70 | + return False |
| 71 | + |
| 72 | + |
| 73 | +class OpenAIResponsesCompactionSession(SessionABC, OpenAIResponsesCompactionAwareSession): |
| 74 | + """Session decorator that triggers responses.compact when stored history grows. |
| 75 | +
|
| 76 | + Works with OpenAI Responses API models only. Wraps any Session (except |
| 77 | + OpenAIConversationsSession) and automatically calls the OpenAI responses.compact |
| 78 | + API after each turn when the decision hook returns True. |
| 79 | + """ |
| 80 | + |
| 81 | + def __init__( |
| 82 | + self, |
| 83 | + session_id: str, |
| 84 | + underlying_session: Session, |
| 85 | + *, |
| 86 | + client: AsyncOpenAI | None = None, |
| 87 | + model: str = "gpt-4.1", |
| 88 | + should_trigger_compaction: Callable[[dict[str, Any]], bool] | None = None, |
| 89 | + ): |
| 90 | + """Initialize the compaction session. |
| 91 | +
|
| 92 | + Args: |
| 93 | + session_id: Identifier for this session. |
| 94 | + underlying_session: Session store that holds the compacted history. Cannot be |
| 95 | + OpenAIConversationsSession. |
| 96 | + client: OpenAI client for responses.compact API calls. Defaults to |
| 97 | + get_default_openai_client() or new AsyncOpenAI(). |
| 98 | + model: Model to use for responses.compact. Defaults to "gpt-4.1". Must be an |
| 99 | + OpenAI model name (gpt-*, o*, or ft:gpt-*). |
| 100 | + should_trigger_compaction: Custom decision hook. Defaults to triggering when |
| 101 | + 10+ compaction candidates exist. |
| 102 | + """ |
| 103 | + if isinstance(underlying_session, OpenAIConversationsSession): |
| 104 | + raise ValueError( |
| 105 | + "OpenAIResponsesCompactionSession cannot wrap OpenAIConversationsSession " |
| 106 | + "because it manages its own history on the server." |
| 107 | + ) |
| 108 | + |
| 109 | + if not is_openai_model_name(model): |
| 110 | + raise ValueError(f"Unsupported model for OpenAI responses compaction: {model}") |
| 111 | + |
| 112 | + self.session_id = session_id |
| 113 | + self.underlying_session = underlying_session |
| 114 | + self._client = client |
| 115 | + self.model = model |
| 116 | + self.should_trigger_compaction = ( |
| 117 | + should_trigger_compaction or default_should_trigger_compaction |
| 118 | + ) |
| 119 | + |
| 120 | + # cache for incremental candidate tracking |
| 121 | + self._compaction_candidate_items: list[TResponseInputItem] | None = None |
| 122 | + self._session_items: list[TResponseInputItem] | None = None |
| 123 | + self._response_id: str | None = None |
| 124 | + |
| 125 | + @property |
| 126 | + def client(self) -> AsyncOpenAI: |
| 127 | + if self._client is None: |
| 128 | + self._client = get_default_openai_client() or AsyncOpenAI() |
| 129 | + return self._client |
| 130 | + |
| 131 | + async def run_compaction(self, args: OpenAIResponsesCompactionArgs | None = None) -> None: |
| 132 | + """Run compaction using responses.compact API.""" |
| 133 | + if args and args.get("response_id"): |
| 134 | + self._response_id = args["response_id"] |
| 135 | + |
| 136 | + if not self._response_id: |
| 137 | + raise ValueError( |
| 138 | + "OpenAIResponsesCompactionSession.run_compaction requires a response_id" |
| 139 | + ) |
| 140 | + |
| 141 | + compaction_candidate_items, session_items = await self._ensure_compaction_candidates() |
| 142 | + |
| 143 | + force = args.get("force", False) if args else False |
| 144 | + should_compact = force or self.should_trigger_compaction( |
| 145 | + { |
| 146 | + "response_id": self._response_id, |
| 147 | + "compaction_candidate_items": compaction_candidate_items, |
| 148 | + "session_items": session_items, |
| 149 | + } |
| 150 | + ) |
| 151 | + |
| 152 | + if not should_compact: |
| 153 | + logger.debug(f"skip: decision hook declined compaction for {self._response_id}") |
| 154 | + return |
| 155 | + |
| 156 | + logger.debug(f"compact: start for {self._response_id} using {self.model}") |
| 157 | + |
| 158 | + compacted = await self.client.responses.compact( |
| 159 | + previous_response_id=self._response_id, |
| 160 | + model=self.model, |
| 161 | + ) |
| 162 | + |
| 163 | + await self.underlying_session.clear_session() |
| 164 | + output_items: list[TResponseInputItem] = [] |
| 165 | + if compacted.output: |
| 166 | + for item in compacted.output: |
| 167 | + if isinstance(item, dict): |
| 168 | + output_items.append(item) |
| 169 | + else: |
| 170 | + # Suppress Pydantic literal warnings: responses.compact can return |
| 171 | + # user-style input_text content inside ResponseOutputMessage. |
| 172 | + output_items.append( |
| 173 | + item.model_dump(exclude_unset=True, warnings=False) # type: ignore |
| 174 | + ) |
| 175 | + |
| 176 | + if output_items: |
| 177 | + await self.underlying_session.add_items(output_items) |
| 178 | + |
| 179 | + self._compaction_candidate_items = select_compaction_candidate_items(output_items) |
| 180 | + self._session_items = output_items |
| 181 | + |
| 182 | + logger.debug( |
| 183 | + f"compact: done for {self._response_id} " |
| 184 | + f"(output={len(output_items)}, candidates={len(self._compaction_candidate_items)})" |
| 185 | + ) |
| 186 | + |
| 187 | + async def get_items(self, limit: int | None = None) -> list[TResponseInputItem]: |
| 188 | + return await self.underlying_session.get_items(limit) |
| 189 | + |
| 190 | + async def add_items(self, items: list[TResponseInputItem]) -> None: |
| 191 | + await self.underlying_session.add_items(items) |
| 192 | + if self._compaction_candidate_items is not None: |
| 193 | + new_candidates = select_compaction_candidate_items(items) |
| 194 | + if new_candidates: |
| 195 | + self._compaction_candidate_items.extend(new_candidates) |
| 196 | + if self._session_items is not None: |
| 197 | + self._session_items.extend(items) |
| 198 | + |
| 199 | + async def pop_item(self) -> TResponseInputItem | None: |
| 200 | + popped = await self.underlying_session.pop_item() |
| 201 | + if popped: |
| 202 | + self._compaction_candidate_items = None |
| 203 | + self._session_items = None |
| 204 | + return popped |
| 205 | + |
| 206 | + async def clear_session(self) -> None: |
| 207 | + await self.underlying_session.clear_session() |
| 208 | + self._compaction_candidate_items = [] |
| 209 | + self._session_items = [] |
| 210 | + |
| 211 | + async def _ensure_compaction_candidates( |
| 212 | + self, |
| 213 | + ) -> tuple[list[TResponseInputItem], list[TResponseInputItem]]: |
| 214 | + """Lazy-load and cache compaction candidates.""" |
| 215 | + if self._compaction_candidate_items is not None and self._session_items is not None: |
| 216 | + return (self._compaction_candidate_items[:], self._session_items[:]) |
| 217 | + |
| 218 | + history = await self.underlying_session.get_items() |
| 219 | + candidates = select_compaction_candidate_items(history) |
| 220 | + self._compaction_candidate_items = candidates |
| 221 | + self._session_items = history |
| 222 | + |
| 223 | + logger.debug( |
| 224 | + f"candidates: initialized (history={len(history)}, candidates={len(candidates)})" |
| 225 | + ) |
| 226 | + return (candidates[:], history[:]) |
0 commit comments