|
13 | 13 | import uuid |
14 | 14 | from typing import TYPE_CHECKING, Any, AsyncGenerator, cast |
15 | 15 |
|
| 16 | +from opentelemetry import trace as trace_api |
| 17 | + |
16 | 18 | from ..experimental.hooks import ( |
17 | 19 | AfterModelInvocationEvent, |
18 | 20 | AfterToolInvocationEvent, |
@@ -114,72 +116,75 @@ async def event_loop_cycle(agent: "Agent", invocation_state: dict[str, Any]) -> |
114 | 116 | parent_span=cycle_span, |
115 | 117 | model_id=model_id, |
116 | 118 | ) |
117 | | - |
118 | | - tool_specs = agent.tool_registry.get_all_tool_specs() |
119 | | - |
120 | | - agent.hooks.invoke_callbacks( |
121 | | - BeforeModelInvocationEvent( |
122 | | - agent=agent, |
123 | | - ) |
124 | | - ) |
125 | | - |
126 | | - try: |
127 | | - # TODO: To maintain backwards compatibility, we need to combine the stream event with invocation_state |
128 | | - # before yielding to the callback handler. This will be revisited when migrating to strongly |
129 | | - # typed events. |
130 | | - async for event in stream_messages(agent.model, agent.system_prompt, agent.messages, tool_specs): |
131 | | - if "callback" in event: |
132 | | - yield { |
133 | | - "callback": {**event["callback"], **(invocation_state if "delta" in event["callback"] else {})} |
134 | | - } |
135 | | - |
136 | | - stop_reason, message, usage, metrics = event["stop"] |
137 | | - invocation_state.setdefault("request_state", {}) |
| 119 | + with trace_api.use_span(model_invoke_span): |
| 120 | + tool_specs = agent.tool_registry.get_all_tool_specs() |
138 | 121 |
|
139 | 122 | agent.hooks.invoke_callbacks( |
140 | | - AfterModelInvocationEvent( |
| 123 | + BeforeModelInvocationEvent( |
141 | 124 | agent=agent, |
142 | | - stop_response=AfterModelInvocationEvent.ModelStopResponse( |
143 | | - stop_reason=stop_reason, |
144 | | - message=message, |
145 | | - ), |
146 | 125 | ) |
147 | 126 | ) |
148 | 127 |
|
149 | | - if model_invoke_span: |
150 | | - tracer.end_model_invoke_span(model_invoke_span, message, usage, stop_reason) |
151 | | - break # Success! Break out of retry loop |
152 | | - |
153 | | - except Exception as e: |
154 | | - if model_invoke_span: |
155 | | - tracer.end_span_with_error(model_invoke_span, str(e), e) |
156 | | - |
157 | | - agent.hooks.invoke_callbacks( |
158 | | - AfterModelInvocationEvent( |
159 | | - agent=agent, |
160 | | - exception=e, |
| 128 | + try: |
| 129 | + # TODO: To maintain backwards compatibility, we need to combine the stream event with invocation_state |
| 130 | + # before yielding to the callback handler. This will be revisited when migrating to strongly |
| 131 | + # typed events. |
| 132 | + async for event in stream_messages(agent.model, agent.system_prompt, agent.messages, tool_specs): |
| 133 | + if "callback" in event: |
| 134 | + yield { |
| 135 | + "callback": { |
| 136 | + **event["callback"], |
| 137 | + **(invocation_state if "delta" in event["callback"] else {}), |
| 138 | + } |
| 139 | + } |
| 140 | + |
| 141 | + stop_reason, message, usage, metrics = event["stop"] |
| 142 | + invocation_state.setdefault("request_state", {}) |
| 143 | + |
| 144 | + agent.hooks.invoke_callbacks( |
| 145 | + AfterModelInvocationEvent( |
| 146 | + agent=agent, |
| 147 | + stop_response=AfterModelInvocationEvent.ModelStopResponse( |
| 148 | + stop_reason=stop_reason, |
| 149 | + message=message, |
| 150 | + ), |
| 151 | + ) |
161 | 152 | ) |
162 | | - ) |
163 | 153 |
|
164 | | - if isinstance(e, ModelThrottledException): |
165 | | - if attempt + 1 == MAX_ATTEMPTS: |
166 | | - yield {"callback": {"force_stop": True, "force_stop_reason": str(e)}} |
167 | | - raise e |
| 154 | + if model_invoke_span: |
| 155 | + tracer.end_model_invoke_span(model_invoke_span, message, usage, stop_reason) |
| 156 | + break # Success! Break out of retry loop |
168 | 157 |
|
169 | | - logger.debug( |
170 | | - "retry_delay_seconds=<%s>, max_attempts=<%s>, current_attempt=<%s> " |
171 | | - "| throttling exception encountered " |
172 | | - "| delaying before next retry", |
173 | | - current_delay, |
174 | | - MAX_ATTEMPTS, |
175 | | - attempt + 1, |
| 158 | + except Exception as e: |
| 159 | + if model_invoke_span: |
| 160 | + tracer.end_span_with_error(model_invoke_span, str(e), e) |
| 161 | + |
| 162 | + agent.hooks.invoke_callbacks( |
| 163 | + AfterModelInvocationEvent( |
| 164 | + agent=agent, |
| 165 | + exception=e, |
| 166 | + ) |
176 | 167 | ) |
177 | | - time.sleep(current_delay) |
178 | | - current_delay = min(current_delay * 2, MAX_DELAY) |
179 | 168 |
|
180 | | - yield {"callback": {"event_loop_throttled_delay": current_delay, **invocation_state}} |
181 | | - else: |
182 | | - raise e |
| 169 | + if isinstance(e, ModelThrottledException): |
| 170 | + if attempt + 1 == MAX_ATTEMPTS: |
| 171 | + yield {"callback": {"force_stop": True, "force_stop_reason": str(e)}} |
| 172 | + raise e |
| 173 | + |
| 174 | + logger.debug( |
| 175 | + "retry_delay_seconds=<%s>, max_attempts=<%s>, current_attempt=<%s> " |
| 176 | + "| throttling exception encountered " |
| 177 | + "| delaying before next retry", |
| 178 | + current_delay, |
| 179 | + MAX_ATTEMPTS, |
| 180 | + attempt + 1, |
| 181 | + ) |
| 182 | + time.sleep(current_delay) |
| 183 | + current_delay = min(current_delay * 2, MAX_DELAY) |
| 184 | + |
| 185 | + yield {"callback": {"event_loop_throttled_delay": current_delay, **invocation_state}} |
| 186 | + else: |
| 187 | + raise e |
183 | 188 |
|
184 | 189 | try: |
185 | 190 | # Add message in trace and mark the end of the stream messages trace |
|
0 commit comments