diff --git a/.cursor/commands/ci-wait-verify-fix.md b/.cursor/commands/ci-wait-verify-fix.md new file mode 100644 index 00000000..5fce52d7 --- /dev/null +++ b/.cursor/commands/ci-wait-verify-fix.md @@ -0,0 +1,45 @@ +## Wait for CI and Fix Failures + +Execute this workflow after pushing to a PR branch. + +### Step 1: Run the CI Wait Script + +```bash +./scripts/ci-wait.sh +``` + +Use `timeout: 660000` (11 min) and `required_permissions: ["network"]`. + +**Exit codes:** +- `0` = All workflows passed → Done +- `1` = One or more workflows failed → Continue to Step 2 +- `2` = Timeout → Report to user, ask how to proceed + +### Step 2: Fetch Failed Logs + +For each failed workflow from the script output: + +```bash +gh run view --log-failed +``` + +### Step 3: Analyze and Fix + +1. Parse the log output to identify the root cause (test failure, lint error, type error, build error) +2. Locate the relevant file(s) and line(s) +3. Apply the minimal fix +4. Commit with message: `fix: resolve CI failure - ` + +### Step 4: Push and Retry + +```bash +git push +``` + +Return to Step 1. Maximum 3 retry attempts before stopping and reporting to user. + +### Constraints + +- Do not introduce new functionality while fixing +- Keep fixes minimal and focused on the specific failure +- If the failure is unclear or requires design decisions, stop and ask the user diff --git a/.cursor/commands/review-and-learn.md b/.cursor/commands/review-and-learn.md new file mode 100644 index 00000000..a50845ce --- /dev/null +++ b/.cursor/commands/review-and-learn.md @@ -0,0 +1,5 @@ +# Review and Learn + +Review the conversation history and learn from the mistakes. + +Track in a table in the `AGENTS.md` file: Anti-patterns | Correct Behavior diff --git a/.cursor/commands/review-and-verify.md b/.cursor/commands/review-and-verify.md new file mode 100644 index 00000000..eede282b --- /dev/null +++ b/.cursor/commands/review-and-verify.md @@ -0,0 +1,3 @@ +# Review and Verify + +Re-read `AGENTS.md` and verify your work follows the guidelines. diff --git a/.cursor/rules/AGENTS.mdc b/.cursor/rules/AGENTS.mdc new file mode 100644 index 00000000..f4e119d0 --- /dev/null +++ b/.cursor/rules/AGENTS.mdc @@ -0,0 +1,5 @@ +--- +alwaysApply: true +--- + +Read `AGENTS.md` - non-negotiable. \ No newline at end of file diff --git a/.cursor/rules/absolutes.mdc b/.cursor/rules/absolutes.mdc deleted file mode 100644 index 7e62eb5a..00000000 --- a/.cursor/rules/absolutes.mdc +++ /dev/null @@ -1,12 +0,0 @@ ---- -alwaysApply: true ---- - -CRITICAL RULES OR YOU'RE FIRED: - -- SRP -- SOLID -- YAGNI -- KISS -- CLEAN CODE -- DRY \ No newline at end of file diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000..d433349c --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,9 @@ +# Guidelines + +## Follow best practices + +- SRP (most important) +- SOLID +- YAGNI +- KISS +- CLEAN CODE diff --git a/TESTING.md b/TESTING.md new file mode 100644 index 00000000..1ecfaeef --- /dev/null +++ b/TESTING.md @@ -0,0 +1,66 @@ +# Testing Philosophy + +## Hierarchy + +| Level | Purpose | Mocking | +| --------------- | ----------------------------- | ------------------------ | +| **E2E** | Happy paths via real examples | None | +| **Integration** | Edge cases, error handling | External boundaries only | +| **Unit** | Pure logic, branches | Everything | + +### Language-Specific Patterns + +| Language | E2E | Integration | Unit | Location | +| ---------- | --------------- | ----------------------- | ---------------- | ------------ | +| TypeScript | `*.e2e.test.ts` | `*.integration.test.ts` | `*.unit.test.ts` | `__tests__/` | +| Python | `test_*_e2e.py` | `test_*_integration.py` | `test_*.py` | `tests/` | +| Go | `*_e2e_test.go` | `*_integration_test.go` | `*_test.go` | same package | + +## Workflow + +1. **Spec first**: Write a `.feature` file in `specs/`. Use tags: `@e2e`, `@integration`, `@unit`. +2. **Challenge**: LLM/reviewer challenges missing edge cases before implementation. +3. **Examples drive E2E**: Working examples in `examples/` are wrapped by e2e tests. +4. **Implement**: Outside-in test driven (TDD). Red → Green → Refactor. + +## Decision Tree + +```text +Is this a happy path demonstrating SDK usage? + → E2E (wrap an example) + +Does it test orchestration between internal modules or external API behavior? + → Integration (mock external boundaries) + +Is it pure logic or a single class in isolation? + → Unit (mock collaborators) + +Is it a regression from production? + → Add test at the LOWEST sufficient level (unit > integration > e2e) +``` + +## Scenario Design + +Each scenario should test **one invariant**. When deciding whether to extend an existing scenario or create a new one: + +- **Extend** (add `And`/`But`): The new assertion is a natural consequence of the same behavior +- **New scenario**: The assertion tests a distinct invariant that could fail independently + +Example: "Cache returns stale data" and "Cache key includes version" are orthogonal invariants — separate scenarios. If one fails, you immediately know which contract broke. + +## What We Don't Test + +- Type definitions +- Simple pass-throughs with no logic +- Third-party library internals +- Constants/config (unless dynamic) + +## Regression Policy + +Edge cases not covered upfront are handled via regression tests. When a bug is found: + +1. Reproduce with a failing test +2. Add test at the lowest sufficient level +3. Fix and verify green + +This keeps the suite lean while ensuring real failures never recur. diff --git a/javascript/examples/vitest/tests/scenario-expert-realtime.test.ts b/javascript/examples/vitest/tests/scenario-expert-realtime.test.ts index 850ed9b3..3e53af86 100644 --- a/javascript/examples/vitest/tests/scenario-expert-realtime.test.ts +++ b/javascript/examples/vitest/tests/scenario-expert-realtime.test.ts @@ -96,7 +96,7 @@ describe("Scenario Expert Agent (Realtime API)", () => { // Judge with audio transcription scenario.judgeAgent({ criteria: [ - "Agent explains what LangWatch Scenario is", + "Agent explains what Scenario is or how it helps test AI agents", "Agent is helpful and informative", ], }) diff --git a/javascript/src/agents/judge/__tests__/judge-span-digest-formatter.test.ts b/javascript/src/agents/judge/__tests__/judge-span-digest-formatter.test.ts index f77a0d18..52177789 100644 --- a/javascript/src/agents/judge/__tests__/judge-span-digest-formatter.test.ts +++ b/javascript/src/agents/judge/__tests__/judge-span-digest-formatter.test.ts @@ -34,10 +34,9 @@ const formatter = new JudgeSpanDigestFormatter(); describe("JudgeSpanDigestFormatter", () => { describe("when no spans", () => { it("returns empty digest marker", () => { - expect(formatter.format([])).toMatchInlineSnapshot(` - "=== OPENTELEMETRY TRACES === - No spans recorded." - `); + expect(formatter.format([])).toMatchInlineSnapshot( + `"No spans recorded."` + ); }); }); @@ -56,8 +55,7 @@ describe("JudgeSpanDigestFormatter", () => { }); expect(formatter.format([span])).toMatchInlineSnapshot(` - "=== OPENTELEMETRY TRACES === - Spans: 1 | Total Duration: 500ms + "Spans: 1 | Total Duration: 500ms [1] 2023-11-14T22:13:20.000Z llm.chat (500ms) gen_ai.prompt: Hello @@ -92,8 +90,7 @@ describe("JudgeSpanDigestFormatter", () => { ]; expect(formatter.format(spans)).toMatchInlineSnapshot(` - "=== OPENTELEMETRY TRACES === - Spans: 3 | Total Duration: 2.05s + "Spans: 3 | Total Duration: 2.05s [1] 2023-11-14T22:13:20.000Z first (200ms) @@ -131,8 +128,7 @@ describe("JudgeSpanDigestFormatter", () => { ]; expect(formatter.format(spans)).toMatchInlineSnapshot(` - "=== OPENTELEMETRY TRACES === - Spans: 3 | Total Duration: 1.00s + "Spans: 3 | Total Duration: 1.00s [1] 2023-11-14T22:13:20.000Z agent.run (1.00s) @@ -168,8 +164,7 @@ describe("JudgeSpanDigestFormatter", () => { ]; expect(formatter.format(spans)).toMatchInlineSnapshot(` - "=== OPENTELEMETRY TRACES === - Spans: 3 | Total Duration: 2.00s + "Spans: 3 | Total Duration: 2.00s [1] 2023-11-14T22:13:20.000Z root (2.00s) @@ -198,8 +193,7 @@ describe("JudgeSpanDigestFormatter", () => { }); expect(formatter.format([span])).toMatchInlineSnapshot(` - "=== OPENTELEMETRY TRACES === - Spans: 1 | Total Duration: 100ms + "Spans: 1 | Total Duration: 100ms [1] 2023-11-14T22:13:20.000Z llm.chat (100ms) gen_ai.prompt: What is the weather in Paris? @@ -231,8 +225,7 @@ describe("JudgeSpanDigestFormatter", () => { ]; expect(formatter.format(spans)).toMatchInlineSnapshot(` - "=== OPENTELEMETRY TRACES === - Spans: 2 | Total Duration: 300ms + "Spans: 2 | Total Duration: 300ms [1] 2023-11-14T22:13:20.000Z successful.operation (100ms) @@ -265,8 +258,7 @@ describe("JudgeSpanDigestFormatter", () => { }); expect(formatter.format([span])).toMatchInlineSnapshot(` - "=== OPENTELEMETRY TRACES === - Spans: 1 | Total Duration: 1.00s + "Spans: 1 | Total Duration: 1.00s [1] 2023-11-14T22:13:20.000Z llm.stream (1.00s) [event] token.generated @@ -296,8 +288,7 @@ describe("JudgeSpanDigestFormatter", () => { }); expect(formatter.format([span])).toMatchInlineSnapshot(` - "=== OPENTELEMETRY TRACES === - Spans: 1 | Total Duration: 100ms + "Spans: 1 | Total Duration: 100ms [1] 2023-11-14T22:13:20.000Z test (100ms) relevant.attribute: should-appear @@ -331,7 +322,7 @@ describe("JudgeSpanDigestFormatter", () => { expect(result).toContain(longContent); expect(result).toContain("[DUPLICATE - SEE ABOVE]"); expect(result.indexOf(longContent)).toBeLessThan( - result.indexOf("[DUPLICATE - SEE ABOVE]"), + result.indexOf("[DUPLICATE - SEE ABOVE]") ); }); diff --git a/javascript/src/agents/judge/judge-span-digest-formatter.ts b/javascript/src/agents/judge/judge-span-digest-formatter.ts index c41c19d1..3b898b33 100644 --- a/javascript/src/agents/judge/judge-span-digest-formatter.ts +++ b/javascript/src/agents/judge/judge-span-digest-formatter.ts @@ -36,7 +36,7 @@ export class JudgeSpanDigestFormatter { if (spans.length === 0) { this.logger.debug("No spans to format"); - return "=== OPENTELEMETRY TRACES ===\nNo spans recorded."; + return "No spans recorded."; } const sortedSpans = this.sortByStartTime(spans); @@ -49,7 +49,6 @@ export class JudgeSpanDigestFormatter { }); const lines: string[] = [ - "=== OPENTELEMETRY TRACES ===", `Spans: ${spans.length} | Total Duration: ${this.formatDuration( totalDuration )}`, diff --git a/javascript/src/execution/scenario-execution.ts b/javascript/src/execution/scenario-execution.ts index bc47d8cb..7ba110bc 100644 --- a/javascript/src/execution/scenario-execution.ts +++ b/javascript/src/execution/scenario-execution.ts @@ -500,11 +500,13 @@ export class ScenarioExecution implements ScenarioExecutionLike { judgmentRequest: boolean = false ): Promise { const agent = this.agents[idx]; + const agentName = agent.name ?? agent.constructor.name; + this.logger.debug(`[${this.config.id}] callAgent started`, { agentIdx: idx, role, judgmentRequest, - agentName: agent.name ?? agent.constructor.name, + agentName, pendingMessagesCount: this.pendingMessages.get(idx)?.length ?? 0, }); @@ -526,108 +528,112 @@ export class ScenarioExecution implements ScenarioExecutionLike { : context.active(); const agentSpanName = `${ - agent.name ?? agent.constructor.name !== Object.prototype.constructor.name + agentName !== Object.prototype.constructor.name ? agent.constructor.name : "Agent" }.call`; - await this.tracer.withActiveSpan( - agentSpanName, - { - attributes: { - "langwatch.thread.id": this.state.threadId, + try { + await this.tracer.withActiveSpan( + agentSpanName, + { + attributes: { + "langwatch.thread.id": this.state.threadId, + }, }, - }, - agentContext, - async (agentSpan) => { - agentSpan.setType("agent"); - - // Set input for the span - agentSpan.setInput("chat_messages", this.state.messages); - - const agentResponse = await agent.call(agentInput); - const endTime = Date.now(); - const duration = endTime - startTime; - - this.logger.debug(`[${this.config.id}] Agent responded`, { - agentIdx: idx, - duration, - responseType: typeof agentResponse, - isScenarioResult: + agentContext, + async (agentSpan) => { + agentSpan.setType("agent"); + + // Set input for the span + agentSpan.setInput("chat_messages", this.state.messages); + + const agentResponse = await agent.call(agentInput); + const endTime = Date.now(); + const duration = endTime - startTime; + + this.logger.debug(`[${this.config.id}] Agent responded`, { + agentIdx: idx, + duration, + responseType: typeof agentResponse, + isScenarioResult: + agentResponse && + typeof agentResponse === "object" && + "success" in agentResponse, + }); + + this.addAgentTime(idx, duration); + this.pendingMessages.delete(idx); + + if ( agentResponse && typeof agentResponse === "object" && - "success" in agentResponse, - }); - - this.addAgentTime(idx, duration); - this.pendingMessages.delete(idx); - - if ( - agentResponse && - typeof agentResponse === "object" && - "success" in agentResponse - ) { - this.logger.debug( - `[${this.config.id}] Agent returned ScenarioResult`, - { - success: (agentResponse as { success: boolean }).success, - } - ); - // JudgeResult is automatically augmented with messages by setResult - this.setResult(agentResponse); - return; - } + "success" in agentResponse + ) { + this.logger.debug( + `[${this.config.id}] Agent returned ScenarioResult`, + { + success: (agentResponse as { success: boolean }).success, + } + ); + // JudgeResult is automatically augmented with messages by setResult + this.setResult(agentResponse); + return; + } - const messages = convertAgentReturnTypesToMessages( - agentResponse, - role === AgentRole.USER ? "user" : "assistant" - ); + const messages = convertAgentReturnTypesToMessages( + agentResponse, + role === AgentRole.USER ? "user" : "assistant" + ); - // Set output for the span - if (messages.length > 0) { - agentSpan.setOutput("chat_messages", messages); - } + // Set output for the span + if (messages.length > 0) { + agentSpan.setOutput("chat_messages", messages); + } - // Set metrics if available (would need to be extracted from agent response) - const metrics: Record = { - duration: endTime - startTime, - }; - - // Add token usage if available from agent response - if (agentResponse && typeof agentResponse === "object") { - const usage = ( - agentResponse as { - usage?: { - prompt_tokens?: number; - completion_tokens?: number; - total_tokens?: number; - }; + // Set metrics if available (would need to be extracted from agent response) + const metrics: Record = { + duration: endTime - startTime, + }; + + // Add token usage if available from agent response + if (agentResponse && typeof agentResponse === "object") { + const usage = ( + agentResponse as { + usage?: { + prompt_tokens?: number; + completion_tokens?: number; + total_tokens?: number; + }; + } + ).usage; + if (usage) { + if (usage.prompt_tokens !== undefined) + metrics.promptTokens = usage.prompt_tokens; + if (usage.completion_tokens !== undefined) + metrics.completionTokens = usage.completion_tokens; + if (usage.total_tokens !== undefined) + metrics.totalTokens = usage.total_tokens; } - ).usage; - if (usage) { - if (usage.prompt_tokens !== undefined) - metrics.promptTokens = usage.prompt_tokens; - if (usage.completion_tokens !== undefined) - metrics.completionTokens = usage.completion_tokens; - if (usage.total_tokens !== undefined) - metrics.totalTokens = usage.total_tokens; } - } - agentSpan.setMetrics(metrics); + agentSpan.setMetrics(metrics); - // Add traceId to each message for proper correlation - const traceId = agentSpan.spanContext().traceId.toString(); + // Add traceId to each message for proper correlation + const traceId = agentSpan.spanContext().traceId.toString(); - for (const message of messages) { - this.state.addMessage({ - ...message, - traceId, - }); - this.broadcastMessage(message, idx); + for (const message of messages) { + this.state.addMessage({ + ...message, + traceId, + }); + this.broadcastMessage(message, idx); + } } - } - ); + ); + } catch (error) { + throw new Error(`[${agentName}] ${error}`, { cause: error }); + } } /** diff --git a/python/examples/test_audio_to_audio.py b/python/examples/test_audio_to_audio.py index e0249730..5c1e295f 100644 --- a/python/examples/test_audio_to_audio.py +++ b/python/examples/test_audio_to_audio.py @@ -62,6 +62,7 @@ def __init__(self): SET_ID = "multimodal-audio-to-audio-test" +@pytest.mark.flaky(reruns=2) @pytest.mark.asyncio async def test_audio_to_audio(): """ @@ -90,11 +91,8 @@ async def test_audio_to_audio(): "content": [ { "type": "text", - "text": """ - Answer the question in the audio. - If you're not sure, you're required to take a best guess. - After you've guessed, you must repeat the question and say what format the input was in (audio or text) - """, + "text": """Answer the question in the audio. If unsure, take your best guess. + Also mention that you received this as audio input.""", }, { "type": "file", @@ -110,9 +108,8 @@ async def test_audio_to_audio(): scenario.JudgeAgent( model="openai/gpt-4o", criteria=[ - "The agent correctly guesses it's a male voice", - "The agent repeats the question", - "The agent says what format the input was in (audio or text)", + "The agent identifies or guesses the voice is male", + "The agent acknowledges the input was audio (not text)", ], ) ) diff --git a/python/examples/test_running_in_parallel.py b/python/examples/test_running_in_parallel.py index 0fccad03..81b57215 100644 --- a/python/examples/test_running_in_parallel.py +++ b/python/examples/test_running_in_parallel.py @@ -69,6 +69,7 @@ async def test_vegetarian_recipe_agent(): @pytest.mark.agent_test @pytest.mark.asyncio_concurrent(group="vegetarian_recipe_agent") +@pytest.mark.flaky(reruns=2) async def test_user_is_hungry(): # Define the scenario result = await scenario.run( diff --git a/python/examples/test_span_based_evaluation_langwatch.py b/python/examples/test_span_based_evaluation_langwatch.py new file mode 100644 index 00000000..556ad851 --- /dev/null +++ b/python/examples/test_span_based_evaluation_langwatch.py @@ -0,0 +1,206 @@ +""" +Span-Based Evaluation with LangWatch Decorators + +This example demonstrates using LangWatch's higher-level instrumentation API +to create custom spans that the judge can evaluate. This approach is useful when: +- You want a simpler, more Pythonic API +- You're already using LangWatch for observability +- You prefer decorators over context managers + +Key concepts: +- Use `@langwatch.span()` decorator for function-level spans +- Use `langwatch.get_current_span()` to access and update the current span +- Child spans automatically inherit thread context from scenario executor + +See also: test_span_based_evaluation_native_otel.py for native OpenTelemetry API. +""" + +import asyncio +import json +import pytest +import scenario +import langwatch +from typing import Any, Callable +from function_schema import get_function_schema +import litellm + + +def check_inventory(product_id: str) -> dict[str, Any]: + """ + Check if an item is in stock. + + Args: + product_id: The product ID to check + + Returns: + Inventory status for the product + """ + return {"in_stock": True, "quantity": 42, "product_id": product_id} + + +class LangWatchDecoratorAgent(scenario.AgentAdapter): + """ + Agent instrumented with LangWatch decorator-based spans. + + Uses @langwatch.span() decorator and langwatch.get_current_span() + to create spans that are visible to the judge during evaluation. + """ + + @langwatch.span(name="http.fraud_check", type="span") + async def _check_fraud(self) -> None: + """Simulate an HTTP call to a fraud detection service.""" + span = langwatch.get_current_span() + span.set_attributes( + { + "http.method": "POST", + "http.url": "https://api.fraudservice.com/check", + "http.status_code": 200, + } + ) + await asyncio.sleep(0.03) + span.set_attributes({"fraud.risk_score": 0.1}) + + @langwatch.span(name="db.query", type="span") + async def _query_database(self) -> None: + """Simulate a database query.""" + span = langwatch.get_current_span() + span.set_attributes( + { + "db.system": "postgresql", + "db.operation": "SELECT", + "db.statement": "SELECT * FROM customers WHERE id = $1", + } + ) + await asyncio.sleep(0.02) + + @langwatch.span(type="tool") + def _execute_tool( + self, + tool_name: str, + tool_func: Callable[..., dict[str, Any]], + tool_args: dict[str, Any], + ) -> dict[str, Any]: + """Execute a tool with LangWatch span instrumentation.""" + span = langwatch.get_current_span() + span.set_attributes( + { + "tool.name": tool_name, + "tool.arguments": json.dumps(tool_args), + } + ) + result = tool_func(**tool_args) + span.set_attributes({"tool.result": json.dumps(result)}) + return result + + async def call(self, input: scenario.AgentInput) -> scenario.AgentReturnTypes: + """Process input and return agent response.""" + await self._check_fraud() + await self._query_database() + + tools = [check_inventory] + response = litellm.completion( + model="openai/gpt-4.1-mini", + messages=[ + { + "role": "system", + "content": """You are an order processing assistant. +When asked about products, use the check_inventory tool.""", + }, + *input.messages, + ], + tools=[ + {"type": "function", "function": get_function_schema(tool)} + for tool in tools + ], + tool_choice="auto", + ) + + message = response.choices[0].message # type: ignore[union-attr] + + if message.tool_calls: + tools_by_name = {tool.__name__: tool for tool in tools} + tool_responses = [] + + for tool_call in message.tool_calls: + tool_name = tool_call.function.name + tool_args = json.loads(tool_call.function.arguments) + + if tool_name in tools_by_name: + result = self._execute_tool( + tool_name=tool_name, + tool_func=tools_by_name[tool_name], + tool_args=tool_args, + ) + + tool_responses.append( + { + "role": "tool", + "tool_call_id": tool_call.id, + "content": json.dumps(result), + } + ) + + follow_up = litellm.completion( + model="openai/gpt-4.1-mini", + messages=[ + { + "role": "system", + "content": "You are an order processing assistant.", + }, + *input.messages, + message, + *tool_responses, + ], + ) + return follow_up.choices[0].message.content or "" # type: ignore[union-attr] + + return message.content or "" + + +@pytest.mark.agent_test +@pytest.mark.asyncio +async def test_langwatch_decorator_span_evaluation(): + """ + Verifies that LangWatch decorator spans are visible to the judge. + + The judge can verify: + - HTTP call spans (http.fraud_check) + - Database query spans (db.query) + - Tool execution spans (_execute_tool with tool type) + """ + result = await scenario.run( + name="langwatch decorator span evaluation", + description=""" + A customer asks about product SKU-123 availability. + The agent should check inventory and respond. + """, + agents=[ + LangWatchDecoratorAgent(), + scenario.UserSimulatorAgent(model="openai/gpt-4.1-mini"), + scenario.JudgeAgent( + model="openai/gpt-4.1", + criteria=[ + "A fraud check HTTP call was made (http.fraud_check span exists)", + "A database query was performed (db.query span exists)", + "The check_inventory tool was called for the product", + ], + ), + ], + script=[ + scenario.user("Is product SKU-123 in stock?"), + scenario.agent(), + scenario.judge(), + ], + max_turns=5, + set_id="python-examples", + ) + + print(f"\nResult: {result}") + print(f"Success: {result.success}") + print(f"Reasoning: {result.reasoning}") + if result.passed_criteria: + print(f"Passed criteria: {result.passed_criteria}") + if result.failed_criteria: + print(f"Failed criteria: {result.failed_criteria}") + + assert result.success, f"Expected success but got: {result.reasoning}" diff --git a/python/examples/test_span_based_evaluation_native_otel.py b/python/examples/test_span_based_evaluation_native_otel.py new file mode 100644 index 00000000..bfb9cd1a --- /dev/null +++ b/python/examples/test_span_based_evaluation_native_otel.py @@ -0,0 +1,196 @@ +""" +Span-Based Evaluation with Native OpenTelemetry + +This example demonstrates using the native OpenTelemetry API directly to create +custom spans that the judge can evaluate. This approach is useful when: +- You have existing OpenTelemetry instrumentation +- You need fine-grained control over span attributes +- You're integrating with other OpenTelemetry-compatible tools + +Key concepts: +- Use `trace.get_tracer()` to get a tracer instance +- Use `tracer.start_as_current_span()` context manager for spans +- Child spans automatically inherit thread context from scenario executor +- Use `span.set_attribute()` for dynamic attributes + +See also: test_span_based_evaluation_langwatch.py for LangWatch's higher-level API. +""" + +import asyncio +import json +import pytest +import scenario +from opentelemetry import trace +from function_schema import get_function_schema +import litellm + + +# Native OpenTelemetry: Get a tracer for creating custom spans +tracer = trace.get_tracer("order-processing-agent") + + +def check_inventory(product_id: str) -> dict: + """ + Check if an item is in stock. + + Args: + product_id: The product ID to check + + Returns: + Inventory status for the product + """ + return {"in_stock": True, "quantity": 42, "product_id": product_id} + + +class NativeOtelAgent(scenario.AgentAdapter): + """ + Agent instrumented with native OpenTelemetry spans. + + Uses the standard OpenTelemetry API (trace.get_tracer, start_as_current_span) + to create spans that are visible to the judge during evaluation. + """ + + async def call(self, input: scenario.AgentInput) -> scenario.AgentReturnTypes: + """Process input and return agent response.""" + # Native OTEL: Create span with context manager + with tracer.start_as_current_span( + "http.fraud_check", + attributes={ + "http.method": "POST", + "http.url": "https://api.fraudservice.com/check", + "http.status_code": 200, + }, + ) as fraud_span: + await asyncio.sleep(0.03) # Simulate network latency + # Native OTEL: Add dynamic attributes after span creation + fraud_span.set_attribute("fraud.risk_score", 0.1) + + # Native OTEL: Another span with initial attributes only + with tracer.start_as_current_span( + "db.query", + attributes={ + "db.system": "postgresql", + "db.operation": "SELECT", + "db.statement": "SELECT * FROM customers WHERE id = $1", + }, + ): + await asyncio.sleep(0.02) # Simulate DB latency + + # LLM call with tool usage + tools = [check_inventory] + response = litellm.completion( + model="openai/gpt-4.1-mini", + messages=[ + { + "role": "system", + "content": """You are an order processing assistant. +When asked about products, use the check_inventory tool.""", + }, + *input.messages, + ], + tools=[ + {"type": "function", "function": get_function_schema(tool)} + for tool in tools + ], + tool_choice="auto", + ) + + message = response.choices[0].message # type: ignore[union-attr] + + # Handle tool calls + if message.tool_calls: + tools_by_name = {tool.__name__: tool for tool in tools} + tool_responses = [] + + for tool_call in message.tool_calls: + tool_name = tool_call.function.name + tool_args = json.loads(tool_call.function.arguments) + + if tool_name in tools_by_name: + # Native OTEL: Span for tool execution with dynamic result + with tracer.start_as_current_span( + f"tool.{tool_name}", + attributes={ + "tool.name": tool_name, + "tool.arguments": json.dumps(tool_args), + }, + ) as tool_span: + result = tools_by_name[tool_name](**tool_args) + tool_span.set_attribute("tool.result", json.dumps(result)) + + tool_responses.append( + { + "role": "tool", + "tool_call_id": tool_call.id, + "content": json.dumps(result), + } + ) + + # Make follow-up call with tool results + follow_up = litellm.completion( + model="openai/gpt-4.1-mini", + messages=[ + { + "role": "system", + "content": "You are an order processing assistant.", + }, + *input.messages, + message, + *tool_responses, + ], + ) + return follow_up.choices[0].message.content or "" # type: ignore[union-attr] + + return message.content or "" + + +@pytest.mark.agent_test +@pytest.mark.asyncio +async def test_native_otel_span_evaluation(): + """ + Verifies that native OpenTelemetry spans are visible to the judge. + + This test demonstrates that spans created with the standard OpenTelemetry API + (tracer.start_as_current_span) are captured and available for judge evaluation. + + The judge can verify: + - HTTP call spans (http.fraud_check) + - Database query spans (db.query) + - Tool execution spans (tool.check_inventory) + """ + result = await scenario.run( + name="native otel span evaluation", + description=""" + A customer asks about product SKU-123 availability. + The agent should check inventory and respond. + """, + agents=[ + NativeOtelAgent(), + scenario.UserSimulatorAgent(model="openai/gpt-4.1-mini"), + scenario.JudgeAgent( + model="openai/gpt-4.1", + criteria=[ + "A fraud check HTTP call was made (http.fraud_check span exists)", + "A database query was performed (db.query span exists)", + "The check_inventory tool was called for the product", + ], + ), + ], + script=[ + scenario.user("Is product SKU-123 in stock?"), + scenario.agent(), + scenario.judge(), + ], + max_turns=5, + set_id="python-examples", + ) + + print(f"\nResult: {result}") + print(f"Success: {result.success}") + print(f"Reasoning: {result.reasoning}") + if result.passed_criteria: + print(f"Passed criteria: {result.passed_criteria}") + if result.failed_criteria: + print(f"Failed criteria: {result.failed_criteria}") + + assert result.success, f"Expected success but got: {result.reasoning}" diff --git a/python/examples/test_tool_failure_simulation.py b/python/examples/test_tool_failure_simulation.py index 47bd92db..ffd64ff2 100644 --- a/python/examples/test_tool_failure_simulation.py +++ b/python/examples/test_tool_failure_simulation.py @@ -207,6 +207,7 @@ async def test_tool_rate_limit_simulation(): @pytest.mark.agent_test +@pytest.mark.flaky(reruns=2) @pytest.mark.asyncio async def test_tool_success_simulation(): """Test agent's ability to handle successful tool calls.""" diff --git a/python/examples/test_voice_to_voice_conversation.py b/python/examples/test_voice_to_voice_conversation.py index 8a4f0190..a2e05ce7 100644 --- a/python/examples/test_voice_to_voice_conversation.py +++ b/python/examples/test_voice_to_voice_conversation.py @@ -102,6 +102,7 @@ async def call(self, input: AgentInput) -> AgentReturnTypes: ) +@pytest.mark.flaky(reruns=2) @pytest.mark.asyncio async def test_voice_to_voice_conversation(): """ diff --git a/python/examples/test_weather_agent.py b/python/examples/test_weather_agent.py index 59e633b2..d6983945 100644 --- a/python/examples/test_weather_agent.py +++ b/python/examples/test_weather_agent.py @@ -12,6 +12,7 @@ @pytest.mark.agent_test @pytest.mark.asyncio +@pytest.mark.flaky(reruns=2) async def test_weather_agent(): # Integrate with your agent class WeatherAgent(scenario.AgentAdapter): @@ -34,6 +35,9 @@ def check_for_weather_tool_call(state: scenario.ScenarioState): scenario.UserSimulatorAgent(model="openai/gpt-4.1"), ], script=[ + scenario.user(), + scenario.agent(), + # Agent sometimes needs to ask for clarification scenario.user(), scenario.agent(), check_for_weather_tool_call, @@ -87,6 +91,7 @@ def weather_agent(messages, response_messages=[]) -> scenario.AgentReturnTypes: "content": """ You a helpful assistant that may help the user with weather information. Do not guess the city if they don't provide it. + Do not ask for clarification """, }, *messages, diff --git a/python/pyproject.toml b/python/pyproject.toml index c9ebd81e..f27852cf 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -22,6 +22,7 @@ classifiers = [ ] dependencies = [ "pytest>=8.1.1", + "pytest-rerunfailures>=14.0", "litellm>=1.49.0", "openai>=1.88.0", "python-dotenv>=1.0.1", @@ -37,6 +38,7 @@ dependencies = [ "python-dateutil>=2.9.0.post0", "pydantic-settings>=2.9.1", "langwatch>=0.2.19", + "opentelemetry-sdk>=1.20.0", ] [project.optional-dependencies] diff --git a/python/scenario/__init__.py b/python/scenario/__init__.py index 7e819771..e9556e71 100644 --- a/python/scenario/__init__.py +++ b/python/scenario/__init__.py @@ -93,6 +93,10 @@ async def test_weather_agent(): For more examples and detailed documentation, visit: https://github.com/langwatch/scenario """ +# Setup logging and tracing infrastructure (side-effect imports) +from .config import logging as _logging_config # noqa: F401 +from . import _tracing # noqa: F401 + # First import non-dependent modules from .types import ScenarioResult, AgentInput, AgentRole, AgentReturnTypes from .config import ScenarioConfig diff --git a/python/scenario/_judge/__init__.py b/python/scenario/_judge/__init__.py new file mode 100644 index 00000000..0cc04a1b --- /dev/null +++ b/python/scenario/_judge/__init__.py @@ -0,0 +1,15 @@ +""" +Judge utilities for span processing and formatting. +""" + +from .judge_span_digest_formatter import ( + JudgeSpanDigestFormatter, + judge_span_digest_formatter, +) +from .judge_utils import JudgeUtils + +__all__ = [ + "JudgeSpanDigestFormatter", + "judge_span_digest_formatter", + "JudgeUtils", +] diff --git a/python/scenario/_judge/deep_transform.py b/python/scenario/_judge/deep_transform.py new file mode 100644 index 00000000..2a7ebc4b --- /dev/null +++ b/python/scenario/_judge/deep_transform.py @@ -0,0 +1,32 @@ +""" +Recursive value transformation utility. +""" + +from typing import Any, Callable, Dict, List + + +def deep_transform(value: Any, fn: Callable[[Any], Any]) -> Any: + """ + Recursively transforms values in a structure. + + If callback returns a different value, uses it and stops recursion for that branch. + Otherwise recurses into lists/dicts. + + Args: + value: The value to transform + fn: Transformation function applied to each value + + Returns: + Transformed value + """ + result = fn(value) + if result is not value: + return result + + if isinstance(value, list): + return [deep_transform(v, fn) for v in value] + + if isinstance(value, dict): + return {k: deep_transform(v, fn) for k, v in value.items()} + + return value diff --git a/python/scenario/_judge/judge_span_digest_formatter.py b/python/scenario/_judge/judge_span_digest_formatter.py new file mode 100644 index 00000000..459dcf34 --- /dev/null +++ b/python/scenario/_judge/judge_span_digest_formatter.py @@ -0,0 +1,304 @@ +""" +Formats OpenTelemetry spans into a plain-text digest for judge evaluation. +""" + +import json +import logging +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any, Dict, List, Mapping, Optional, Sequence, Tuple, cast + +from opentelemetry.sdk.trace import ReadableSpan +from opentelemetry.trace import StatusCode +from opentelemetry.util.types import AttributeValue + +from .deep_transform import deep_transform +from .string_deduplicator import StringDeduplicator +from .truncate_media import truncate_media_url, truncate_media_part + + +logger = logging.getLogger("scenario.judge") + + +@dataclass +class SpanNode: + """Represents a span node in the hierarchy tree.""" + + span: ReadableSpan + children: List["SpanNode"] + + +class JudgeSpanDigestFormatter: + """ + Transforms OpenTelemetry spans into a complete plain-text digest for judge evaluation. + + Deduplicates repeated string content to reduce token usage. + """ + + def __init__(self) -> None: + self._deduplicator = StringDeduplicator(threshold=50) + + def format(self, spans: Sequence[ReadableSpan]) -> str: + """ + Formats spans into a complete digest with full content and nesting. + + Args: + spans: All spans for a thread + + Returns: + Plain text digest + """ + self._deduplicator.reset() + + logger.debug( + "format() called", + extra={ + "span_count": len(spans), + "span_names": [s.name for s in spans], + }, + ) + + if not spans: + logger.debug("No spans to format") + return "No spans recorded." + + sorted_spans = self._sort_by_start_time(spans) + tree = self._build_hierarchy(sorted_spans) + total_duration = self._calculate_total_duration(sorted_spans) + + logger.debug( + "Hierarchy built", + extra={ + "root_count": len(tree), + "total_duration": total_duration, + }, + ) + + lines: List[str] = [ + f"Spans: {len(spans)} | Total Duration: {self._format_duration(total_duration)}", + "", + ] + + sequence = 1 + root_count = len(tree) + for idx, node in enumerate(tree): + sequence = self._render_node( + node, lines, depth=0, sequence=sequence, is_last=(idx == root_count - 1) + ) + + errors = self._collect_errors(spans) + if errors: + lines.append("") + lines.append("=== ERRORS ===") + lines.extend(errors) + + return "\n".join(lines) + + def _sort_by_start_time(self, spans: Sequence[ReadableSpan]) -> List[ReadableSpan]: + """Sorts spans by start time.""" + return sorted(spans, key=lambda s: self._hr_time_to_ms(s.start_time or 0)) + + def _build_hierarchy(self, spans: List[ReadableSpan]) -> List[SpanNode]: + """Builds a tree structure from flat span list.""" + span_map: Dict[int, SpanNode] = {} + roots: List[SpanNode] = [] + + for span in spans: + span_ctx = span.get_span_context() + span_id = span_ctx.span_id if span_ctx else 0 + span_map[span_id] = SpanNode(span=span, children=[]) + + for span in spans: + span_ctx = span.get_span_context() + span_id = span_ctx.span_id if span_ctx else 0 + node = span_map[span_id] + parent_ctx = span.parent + + if parent_ctx is not None: + parent_id = parent_ctx.span_id + if parent_id in span_map: + span_map[parent_id].children.append(node) + else: + roots.append(node) + else: + roots.append(node) + + return roots + + def _render_node( + self, + node: SpanNode, + lines: List[str], + depth: int, + sequence: int, + is_last: bool = True, + ) -> int: + """Renders a span node and its children.""" + span = node.span + duration = self._calculate_span_duration(span) + timestamp = self._format_timestamp(span.start_time or 0) + status = self._get_status_indicator(span) + + prefix = self._get_tree_prefix(depth, is_last) + lines.append( + f"{prefix}[{sequence}] {timestamp} {span.name} ({self._format_duration(duration)}){status}" + ) + + attr_indent = self._get_attr_indent(depth, is_last) + attrs = self._clean_attributes(dict(span.attributes) if span.attributes else {}) + for key, value in attrs.items(): + lines.append(f"{attr_indent}{key}: {self._format_value(value)}") + + if span.events: + for event in span.events: + lines.append(f"{attr_indent}[event] {event.name}") + if event.attributes: + event_attrs = self._clean_attributes(dict(event.attributes)) + for key, value in event_attrs.items(): + lines.append( + f"{attr_indent} {key}: {self._format_value(value)}" + ) + + lines.append("") + + next_seq = sequence + 1 + child_count = len(node.children) + for idx, child in enumerate(node.children): + next_seq = self._render_node( + child, lines, depth + 1, next_seq, is_last=(idx == child_count - 1) + ) + + return next_seq + + def _get_tree_prefix(self, depth: int, is_last: bool) -> str: + """Gets tree drawing prefix for a given depth.""" + if depth == 0: + return "" + connector = "└── " if is_last else "├── " + return "│ " * (depth - 1) + connector + + def _get_attr_indent(self, depth: int, is_last: bool) -> str: + """Gets attribute indentation for a given depth.""" + if depth == 0: + return " " + continuation = " " if is_last else "│ " + return "│ " * (depth - 1) + continuation + " " + + def _clean_attributes(self, attrs: Dict[str, Any]) -> Dict[str, Any]: + """Cleans attributes by removing internal keys.""" + cleaned: Dict[str, Any] = {} + seen: set = set() + + for key, value in attrs.items(): + clean_key = ( + key.replace("langwatch.", "", 1) + if key.startswith("langwatch.") + else key + ) + if clean_key in ["thread.id", "scenario.id", "scenario.name"]: + continue + if clean_key not in seen: + seen.add(clean_key) + cleaned[clean_key] = value + + return cleaned + + def _format_value(self, value: Any) -> str: + """Formats a value for display.""" + processed = self._transform_value(value) + if isinstance(processed, str): + return processed + return json.dumps(processed) + + def _transform_value(self, value: Any) -> Any: + """Transforms a value, handling media and deduplication.""" + + def transform_fn(v: Any) -> Any: + # AI SDK media parts + media_part = truncate_media_part(v) + if media_part is not None: + return media_part + + # Not a string - continue traversal + if not isinstance(v, str): + return v + + # String transforms + return self._transform_string(v) + + return deep_transform(value, transform_fn) + + def _transform_string(self, s: str) -> str: + """Transforms a string, handling JSON, data URLs, and deduplication.""" + # JSON strings - parse and recurse + if self._looks_like_json(s): + try: + processed = self._transform_value(json.loads(s)) + return json.dumps(processed) + except json.JSONDecodeError: + pass + + # Data URLs -> marker + truncated = truncate_media_url(s) + if truncated != s: + return truncated + + # Dedup + return self._deduplicator.process(s) + + def _looks_like_json(self, s: str) -> bool: + """Checks if a string looks like JSON.""" + t = s.strip() + return (t.startswith("{") and t.endswith("}")) or ( + t.startswith("[") and t.endswith("]") + ) + + def _hr_time_to_ms(self, hr_time: int) -> float: + """Converts nanoseconds to milliseconds.""" + return hr_time / 1_000_000 + + def _calculate_span_duration(self, span: ReadableSpan) -> float: + """Calculates span duration in milliseconds.""" + start = span.start_time or 0 + end = span.end_time or 0 + return self._hr_time_to_ms(end) - self._hr_time_to_ms(start) + + def _calculate_total_duration(self, spans: List[ReadableSpan]) -> float: + """Calculates total duration from first start to last end.""" + if not spans: + return 0 + first = self._hr_time_to_ms(spans[0].start_time or 0) + last = max(self._hr_time_to_ms(s.end_time or 0) for s in spans) + return last - first + + def _format_duration(self, ms: float) -> str: + """Formats duration in human-readable form.""" + if ms < 1000: + return f"{round(ms)}ms" + return f"{ms / 1000:.2f}s" + + def _format_timestamp(self, hr_time: int) -> str: + """Formats nanoseconds timestamp as ISO string.""" + ms = self._hr_time_to_ms(hr_time) + dt = datetime.fromtimestamp(ms / 1000, tz=timezone.utc) + return dt.isoformat().replace("+00:00", "Z") + + def _get_status_indicator(self, span: ReadableSpan) -> str: + """Gets error indicator if span has error status.""" + if span.status.status_code == StatusCode.ERROR: + message = span.status.description or "unknown" + return f" ⚠️ ERROR: {message}" + return "" + + def _collect_errors(self, spans: Sequence[ReadableSpan]) -> List[str]: + """Collects error messages from failed spans.""" + errors = [] + for s in spans: + if s.status.status_code == StatusCode.ERROR: + message = s.status.description or "unknown error" + errors.append(f"- {s.name}: {message}") + return errors + + +# Singleton instance +judge_span_digest_formatter = JudgeSpanDigestFormatter() diff --git a/python/scenario/_judge/judge_utils.py b/python/scenario/_judge/judge_utils.py new file mode 100644 index 00000000..ac55ad27 --- /dev/null +++ b/python/scenario/_judge/judge_utils.py @@ -0,0 +1,111 @@ +""" +Utilities for the Judge agent. +""" + +import json +import re +from typing import Any, Dict, List + +from openai.types.chat import ChatCompletionMessageParam + +from .deep_transform import deep_transform + + +def _truncate_base64_media(value: Any) -> Any: + """ + Truncates base64 media data to reduce token usage. + + Handles: + - Data URLs: `data:image/png;base64,...` + - AI SDK file parts: `{ type: "file", mediaType: "audio/wav", data: "" }` + - Raw base64 strings over threshold (likely binary data) + """ + + def transform_fn(v: Any) -> Any: + if isinstance(v, str): + # Handle data URLs + match = re.match( + r"^data:((image|audio|video)/[a-z0-9+.-]+);base64,(.+)$", + v, + re.IGNORECASE, + ) + if match: + mime_type = match.group(1) + media_type = match.group(2).upper() + size = len(match.group(3)) + return f"[{media_type}: {mime_type}, ~{size} bytes]" + return v + + if isinstance(v, dict): + obj = v + + # Handle AI SDK file parts: {type: "file", mediaType: "...", data: ""} + if ( + obj.get("type") == "file" + and isinstance(obj.get("mediaType"), str) + and isinstance(obj.get("data"), str) + ): + media_type = obj["mediaType"] + category = ( + media_type.split("/")[0].upper() if "/" in media_type else "FILE" + ) + return { + **obj, + "data": f"[{category}: {media_type}, ~{len(obj['data'])} bytes]", + } + + # Handle image parts with raw base64: {type: "image", image: ""} + if obj.get("type") == "image" and isinstance(obj.get("image"), str): + image_data = obj["image"] + + # Check if it's a data URL or raw base64 + data_url_match = re.match( + r"^data:((image)/[a-z0-9+.-]+);base64,(.+)$", + image_data, + re.IGNORECASE, + ) + if data_url_match: + return { + **obj, + "image": f"[IMAGE: {data_url_match.group(1)}, ~{len(data_url_match.group(3))} bytes]", + } + + # Raw base64 (long string without common text patterns) + if len(image_data) > 1000 and re.match( + r"^[A-Za-z0-9+/=]+$", image_data + ): + return { + **obj, + "image": f"[IMAGE: unknown, ~{len(image_data)} bytes]", + } + + return v + + return deep_transform(value, transform_fn) + + +class JudgeUtils: + """Utilities for the Judge agent.""" + + @staticmethod + def build_transcript_from_messages( + messages: List[ChatCompletionMessageParam], + ) -> str: + """ + Builds a minimal transcript from messages for judge evaluation. + + Truncates base64 media to reduce token usage. + + Args: + messages: Array of ChatCompletionMessageParam from conversation + + Returns: + Plain text transcript with one message per line + """ + lines = [] + for msg in messages: + role = msg.get("role", "unknown") + content = msg.get("content", "") + truncated_content = _truncate_base64_media(content) + lines.append(f"{role}: {json.dumps(truncated_content)}") + return "\n".join(lines) diff --git a/python/scenario/_judge/string_deduplicator.py b/python/scenario/_judge/string_deduplicator.py new file mode 100644 index 00000000..92ad9fa6 --- /dev/null +++ b/python/scenario/_judge/string_deduplicator.py @@ -0,0 +1,56 @@ +""" +String deduplication for reducing token usage in digests. +""" + +import re +from typing import Dict + + +class StringDeduplicator: + """ + Tracks seen strings and replaces duplicates with markers. + + Only operates on strings; does not handle traversal. + """ + + def __init__(self, *, threshold: int) -> None: + """ + Args: + threshold: Minimum string length to consider for deduplication + """ + self._seen: Dict[str, bool] = {} + self._threshold = threshold + + def reset(self) -> None: + """Resets seen strings for a new digest.""" + self._seen.clear() + + def process(self, s: str) -> str: + """ + Processes a string, returning duplicate marker if seen before. + + Args: + s: String to process + + Returns: + Original string or duplicate marker + """ + if len(s) < self._threshold: + return s + + key = self._normalize(s) + if key in self._seen: + return "[DUPLICATE - SEE ABOVE]" + + self._seen[key] = True + return s + + def _normalize(self, s: str) -> str: + """Normalizes string for comparison (whitespace, case).""" + # JSON-escaped whitespace + result = re.sub(r"\\[nrt]", " ", s) + # Actual whitespace + result = re.sub(r"[\n\r\t]", " ", result) + # Collapse + result = re.sub(r"\s+", " ", result) + return result.strip().lower() diff --git a/python/scenario/_judge/truncate_media.py b/python/scenario/_judge/truncate_media.py new file mode 100644 index 00000000..fe9a6acb --- /dev/null +++ b/python/scenario/_judge/truncate_media.py @@ -0,0 +1,84 @@ +""" +Media truncation utilities for reducing token usage. +""" + +import re +from typing import Any, Dict, Optional + + +def truncate_media_url(s: str) -> str: + """ + Truncates base64 data URLs to human-readable markers. + + Args: + s: String to check + + Returns: + Marker if data URL, original string otherwise + """ + match = re.match( + r"^data:((image|audio|video)/[a-z0-9+.-]+);base64,(.+)$", + s, + re.IGNORECASE, + ) + if not match: + return s + + mime_type = match.group(1) + category = match.group(2).upper() + data = match.group(3) + return f"[{category}: {mime_type}, ~{len(data)} bytes]" + + +def truncate_media_part(v: Any) -> Optional[Dict[str, Any]]: + """ + Truncates AI SDK file/image parts by replacing base64 data with markers. + + Args: + v: Value to check + + Returns: + Truncated object if media part, None otherwise + """ + if v is None or not isinstance(v, dict): + return None + + obj = v + + # AI SDK file parts: {type: "file", mediaType: "...", data: "..."} + if ( + obj.get("type") == "file" + and isinstance(obj.get("mediaType"), str) + and isinstance(obj.get("data"), str) + ): + media_type = obj["mediaType"] + category = media_type.split("/")[0].upper() if "/" in media_type else "FILE" + return { + **obj, + "data": f"[{category}: {media_type}, ~{len(obj['data'])} bytes]", + } + + # AI SDK image parts: {type: "image", image: "..."} + if obj.get("type") == "image" and isinstance(obj.get("image"), str): + image_data = obj["image"] + + # Data URL format + data_url_match = re.match( + r"^data:((image)/[a-z0-9+.-]+);base64,(.+)$", + image_data, + re.IGNORECASE, + ) + if data_url_match: + return { + **obj, + "image": f"[IMAGE: {data_url_match.group(1)}, ~{len(data_url_match.group(3))} bytes]", + } + + # Raw base64 (long string without common text patterns) + if len(image_data) > 1000 and re.match(r"^[A-Za-z0-9+/=]+$", image_data): + return { + **obj, + "image": f"[IMAGE: unknown, ~{len(image_data)} bytes]", + } + + return None diff --git a/python/scenario/_tracing/__init__.py b/python/scenario/_tracing/__init__.py new file mode 100644 index 00000000..adfa85b3 --- /dev/null +++ b/python/scenario/_tracing/__init__.py @@ -0,0 +1,20 @@ +""" +Tracing infrastructure for scenario testing. + +This module sets up OpenTelemetry instrumentation with LangWatch, +registering the JudgeSpanCollector to capture spans for judge evaluation. + +Importing this module triggers setup as a side-effect. +""" + +from .setup import setup_observability +from .judge_span_collector import judge_span_collector, JudgeSpanCollector + +# Trigger setup on import +setup_observability() + +__all__ = [ + "judge_span_collector", + "JudgeSpanCollector", + "setup_observability", +] diff --git a/python/scenario/_tracing/judge_span_collector.py b/python/scenario/_tracing/judge_span_collector.py new file mode 100644 index 00000000..53fd7ffc --- /dev/null +++ b/python/scenario/_tracing/judge_span_collector.py @@ -0,0 +1,83 @@ +""" +Collects OpenTelemetry spans for judge evaluation. + +Implements SpanProcessor to intercept spans as they complete, +storing them for later retrieval by thread ID. +""" + +from typing import List, Dict, Optional +from opentelemetry.context import Context +from opentelemetry.sdk.trace import SpanProcessor, ReadableSpan + + +class JudgeSpanCollector(SpanProcessor): + """ + Collects OpenTelemetry spans for judge evaluation. + + Implements SpanProcessor to intercept spans as they complete. + Spans can be retrieved by thread ID for inclusion in judge prompts. + """ + + def __init__(self) -> None: + self._spans: List[ReadableSpan] = [] + + def on_start( + self, + span: ReadableSpan, + parent_context: Optional[Context] = None, + ) -> None: + """Called when a span starts. No-op for collection purposes.""" + pass + + def on_end(self, span: ReadableSpan) -> None: + """Called when a span ends. Stores the span for later retrieval.""" + self._spans.append(span) + + def shutdown(self) -> None: + """Shuts down the processor, clearing all stored spans.""" + self._spans = [] + + def force_flush(self, timeout_millis: Optional[int] = None) -> bool: + """Force flush is a no-op for this collector.""" + return True + + def get_spans_for_thread(self, thread_id: str) -> List[ReadableSpan]: + """ + Retrieves all spans associated with a specific thread. + + Traverses parent relationships to find spans belonging to a thread, + even if the thread ID is only set on an ancestor span. + + Args: + thread_id: The thread identifier to filter spans by + + Returns: + List of spans for the given thread + """ + span_map: Dict[int, ReadableSpan] = {} + + # Index all spans by ID + for span in self._spans: + span_ctx = span.get_span_context() + span_id = span_ctx.span_id if span_ctx else 0 + span_map[span_id] = span + + def belongs_to_thread(span: ReadableSpan) -> bool: + """Check if span or any ancestor belongs to thread.""" + attrs = span.attributes or {} + if attrs.get("langwatch.thread.id") == thread_id: + return True + + parent_ctx = span.parent + if parent_ctx is not None: + parent_id = parent_ctx.span_id + if parent_id in span_map: + return belongs_to_thread(span_map[parent_id]) + + return False + + return [s for s in self._spans if belongs_to_thread(s)] + + +# Singleton instance +judge_span_collector = JudgeSpanCollector() diff --git a/python/scenario/_tracing/setup.py b/python/scenario/_tracing/setup.py new file mode 100644 index 00000000..01a87fad --- /dev/null +++ b/python/scenario/_tracing/setup.py @@ -0,0 +1,58 @@ +""" +Sets up OpenTelemetry span collection for scenario testing. + +Single responsibility: Register the judge span collector with the tracer provider. +""" + +import logging +from typing import Optional + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider + +from .judge_span_collector import judge_span_collector + + +logger = logging.getLogger("scenario.tracing") + +_setup_complete = False + + +def setup_observability() -> Optional[TracerProvider]: + """ + Registers the judge span collector with the existing tracer provider. + + This function is idempotent - calling it multiple times has no effect + after the first successful setup. + + Note: This does NOT call langwatch.setup() - that is handled by the + scenario_executor when creating traces. This only adds our span collector + to capture spans for judge evaluation. + + Returns: + The TracerProvider if setup succeeded, or None if already set up + """ + global _setup_complete + + if _setup_complete: + return None + + _setup_complete = True + + # Get the existing tracer provider or create one + existing_provider = trace.get_tracer_provider() + + # If there's already a TracerProvider configured, add our processor to it + if isinstance(existing_provider, TracerProvider): + existing_provider.add_span_processor(judge_span_collector) + logger.debug("Added judge span collector to existing TracerProvider") + return existing_provider + + # Otherwise create a new one with our collector + # (langwatch.trace() in scenario_executor will handle its own setup) + tracer_provider = TracerProvider() + tracer_provider.add_span_processor(judge_span_collector) + trace.set_tracer_provider(tracer_provider) + logger.debug("Created new TracerProvider with judge span collector") + + return tracer_provider diff --git a/python/scenario/config/logging.py b/python/scenario/config/logging.py new file mode 100644 index 00000000..72e9fc5f --- /dev/null +++ b/python/scenario/config/logging.py @@ -0,0 +1,43 @@ +""" +Logging configuration for Scenario. + +Configures the scenario logger based on SCENARIO_LOG_LEVEL environment variable. +""" + +import logging +import os +from typing import Optional + + +def configure_logging() -> None: + """ + Configure the scenario logger based on SCENARIO_LOG_LEVEL env var. + + Supported levels: DEBUG, INFO, WARNING, ERROR, CRITICAL + If not set, logging remains unconfigured (silent). + """ + level_str: Optional[str] = os.environ.get("SCENARIO_LOG_LEVEL") + + if not level_str: + return + + level_str = level_str.upper() + level = getattr(logging, level_str, None) + + if not isinstance(level, int): + return + + logger = logging.getLogger("scenario") + logger.setLevel(level) + + if not logger.handlers: + handler = logging.StreamHandler() + handler.setLevel(level) + formatter = logging.Formatter( + "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + ) + handler.setFormatter(formatter) + logger.addHandler(handler) + + +configure_logging() diff --git a/python/scenario/judge_agent.py b/python/scenario/judge_agent.py index ca8cc4c6..8abbaeaf 100644 --- a/python/scenario/judge_agent.py +++ b/python/scenario/judge_agent.py @@ -10,7 +10,7 @@ import json import logging import re -from typing import List, Optional, cast +from typing import Any, List, Optional, cast import litellm from litellm import Choices @@ -21,6 +21,8 @@ from scenario.config import ModelConfig, ScenarioConfig from ._error_messages import agent_not_configured_error_message +from ._judge import JudgeUtils, judge_span_digest_formatter +from ._tracing import judge_span_collector, JudgeSpanCollector from .types import AgentInput, AgentReturnTypes, AgentRole, ScenarioResult @@ -106,6 +108,7 @@ class JudgeAgent(AgentAdapter): criteria: List[str] system_prompt: Optional[str] _extra_params: dict + _span_collector: JudgeSpanCollector def __init__( self, @@ -117,6 +120,7 @@ def __init__( temperature: float = 0.0, max_tokens: Optional[int] = None, system_prompt: Optional[str] = None, + span_collector: Optional[JudgeSpanCollector] = None, **extra_params, ): """ @@ -137,6 +141,7 @@ def __init__( max_tokens: Maximum number of tokens for judge reasoning and explanations. system_prompt: Custom system prompt to override default judge behavior. Use this to create specialized evaluation perspectives. + span_collector: Optional span collector for telemetry. Defaults to global singleton. Raises: Exception: If no model is configured either in parameters or global config @@ -173,6 +178,7 @@ def __init__( self.temperature = temperature self.max_tokens = max_tokens self.system_prompt = system_prompt + self._span_collector = span_collector or judge_span_collector if model: self.model = model @@ -248,6 +254,21 @@ async def call( scenario = input.scenario_state + # Build transcript and traces digest + transcript = JudgeUtils.build_transcript_from_messages(input.messages) + traces_digest = self._get_opentelemetry_traces_digest(input.thread_id) + + logger.debug(f"OpenTelemetry traces built: {traces_digest[:200]}...") + + content_for_judge = f""" + +{transcript} + + +{traces_digest} + +""" + criteria_str = "\n".join( [f"{idx + 1}. {criterion}" for idx, criterion in enumerate(self.criteria)] ) @@ -280,7 +301,7 @@ async def call( """, }, - *input.messages, + {"role": "user", "content": content_for_judge}, ] is_last_message = ( @@ -427,7 +448,7 @@ async def call( # Return the appropriate ScenarioResult based on the verdict return ScenarioResult( success=verdict == "success" and len(failed_criteria) == 0, - messages=messages, + messages=cast(Any, messages), reasoning=reasoning, passed_criteria=passed_criteria, failed_criteria=failed_criteria, @@ -451,3 +472,16 @@ async def call( raise Exception( f"Unexpected response format from LLM: {response.__repr__()}" ) + + def _get_opentelemetry_traces_digest(self, thread_id: str) -> str: + """ + Retrieves and formats OpenTelemetry traces for a thread. + + Args: + thread_id: The thread identifier to get traces for + + Returns: + Formatted digest of traces + """ + spans = self._span_collector.get_spans_for_thread(thread_id) + return judge_span_digest_formatter.format(spans) diff --git a/python/scenario/scenario_executor.py b/python/scenario/scenario_executor.py index 31838f79..01b7041f 100644 --- a/python/scenario/scenario_executor.py +++ b/python/scenario/scenario_executor.py @@ -43,7 +43,13 @@ ChatCompletionAssistantMessageParam, ) -from .types import AgentInput, AgentRole, ChatCompletionMessageParamWithTrace, ScenarioResult, ScriptStep +from .types import ( + AgentInput, + AgentRole, + ChatCompletionMessageParamWithTrace, + ScenarioResult, + ScriptStep, +) from ._error_messages import agent_response_not_awaitable from .cache import context_scenario from .agent_adapter import AgentAdapter @@ -521,44 +527,50 @@ async def _call_agent( ChatCompletionUserMessageParam(role="user", content=input_message) ] - with self._trace.span(type="agent", name=f"{agent.__class__.__name__}.call") as span: - with show_spinner( - text=( - "Judging..." - if role == AgentRole.JUDGE - else f"{role.value if isinstance(role, AgentRole) else role}:" - ), - color=( - "blue" - if role == AgentRole.AGENT - else "green" if role == AgentRole.USER else "yellow" - ), - enabled=self.config.verbose, - ): - start_time = time.time() - - # Prevent pydantic validation warnings which should already be disabled - with warnings.catch_warnings(): - warnings.simplefilter("ignore") - - self._trace.autotrack_litellm_calls(litellm) - - agent_response = agent.call( - AgentInput( - # TODO: test thread_id - thread_id=self._state.thread_id, - messages=cast(List[ChatCompletionMessageParam], self._state.messages), - new_messages=self._pending_messages.get(idx, []), - judgment_request=request_judgment, - scenario_state=self._state, + try: + with self._trace.span( + type="agent", name=f"{agent.__class__.__name__}.call" + ) as span: + span.set_attributes({"langwatch.thread.id": self._state.thread_id}) + with show_spinner( + text=( + "Judging..." + if role == AgentRole.JUDGE + else f"{role.value if isinstance(role, AgentRole) else role}:" + ), + color=( + "blue" + if role == AgentRole.AGENT + else "green" if role == AgentRole.USER else "yellow" + ), + enabled=self.config.verbose, + ): + start_time = time.time() + + # Prevent pydantic validation warnings which should already be disabled + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + + self._trace.autotrack_litellm_calls(litellm) + + agent_response = agent.call( + AgentInput( + thread_id=self._state.thread_id, + messages=cast( + List[ChatCompletionMessageParam], + self._state.messages, + ), + new_messages=self._pending_messages.get(idx, []), + judgment_request=request_judgment, + scenario_state=self._state, + ) + ) + if not isinstance(agent_response, Awaitable): + raise Exception( + agent_response_not_awaitable(agent.__class__.__name__), ) - ) - if not isinstance(agent_response, Awaitable): - raise Exception( - agent_response_not_awaitable(agent.__class__.__name__), - ) - agent_response = await agent_response + agent_response = await agent_response if idx not in self._agent_times: self._agent_times[idx] = 0 @@ -599,6 +611,9 @@ async def _call_agent( ) return messages + except Exception as e: + agent_name = agent.__class__.__name__ + raise RuntimeError(f"[{agent_name}] {e}") from e def _scenario_name(self): if self.config.verbose == 2: diff --git a/python/tests/test_context_window_exceeded_integration.py b/python/tests/test_context_window_exceeded_integration.py new file mode 100644 index 00000000..223804a9 --- /dev/null +++ b/python/tests/test_context_window_exceeded_integration.py @@ -0,0 +1,147 @@ +""" +Integration tests for context window exceeded error handling. + +Tests that when an agent exceeds the LLM context window: +1. The error identifies which agent caused the failure +2. Reports are still sent even when errors occur + +See: specs/context-window-exceeded.feature +""" + +import pytest +from typing import List + +import scenario +from scenario._generated.langwatch_api_client.lang_watch_api_client.types import Unset +from scenario.scenario_executor import ScenarioExecutor +from scenario._events import ( + ScenarioEvent, + ScenarioRunFinishedEvent, + ScenarioEventBus, + EventReporter, +) + + +class VerboseAgent(scenario.AgentAdapter): + """Agent that returns a response large enough to exceed context limits.""" + + async def call(self, input: scenario.AgentInput) -> scenario.AgentReturnTypes: + giant_response = "Here's some weather data:\n" + ("x" * 200_000) + return {"role": "assistant", "content": giant_response} + + +class MockUserSimulatorAgent(scenario.AgentAdapter): + """Mock UserSimulatorAgent that returns a simple message without API calls.""" + + role = scenario.AgentRole.USER + + async def call(self, input: scenario.AgentInput) -> scenario.AgentReturnTypes: + return {"role": "user", "content": "What's the weather like today?"} + + +class MockJudgeAgent(scenario.AgentAdapter): + """Mock JudgeAgent that raises a context window exceeded error.""" + + role = scenario.AgentRole.JUDGE + + async def call(self, input: scenario.AgentInput) -> scenario.AgentReturnTypes: + raise Exception( + "This model's maximum context length is 16385 tokens. " + "However, your messages resulted in 50000 tokens." + ) + + +class MockEventReporter(EventReporter): + """Captures events without HTTP calls.""" + + def __init__(self) -> None: + self.posted_events: List[ScenarioEvent] = [] + + async def post_event(self, event: ScenarioEvent) -> dict: + self.posted_events.append(event) + return {} + + +@pytest.mark.asyncio +async def test_error_identifies_agent_that_exceeded_context(): + """ + Scenario: Error identifies the agent that exceeded context + + Given a VerboseAgent that returns 200k characters + And a JudgeAgent with a 16k context model + When the scenario runs + Then it should raise a context window error + And the error message should contain "JudgeAgent" + And the error message should mention "token" or "context" + """ + with pytest.raises(Exception) as exc_info: + await scenario.run( + name="context overflow test", + description="User asks for weather info", + agents=[ + VerboseAgent(), + MockUserSimulatorAgent(), + MockJudgeAgent(), + ], + script=[ + scenario.user(), + scenario.agent(), + scenario.proceed(), + ], + ) + + error_msg = str(exc_info.value) + assert ( + "MockJudgeAgent" in error_msg + ), f"Error should identify MockJudgeAgent, got: {error_msg}" + assert ( + "token" in error_msg.lower() or "context" in error_msg.lower() + ), f"Error should mention tokens or context, got: {error_msg}" + + +@pytest.mark.asyncio +async def test_reports_sent_when_context_exceeded(): + """ + Scenario: Reports are still sent when context is exceeded + + Given a VerboseAgent that returns 200k characters + And a JudgeAgent with a 16k context model + When the scenario runs and fails + Then a ScenarioRunFinishedEvent should be emitted + And the event status should be "ERROR" + And the event reasoning should contain the error message + """ + mock_reporter = MockEventReporter() + event_bus = ScenarioEventBus(event_reporter=mock_reporter) + events: List[ScenarioEvent] = [] + + executor = ScenarioExecutor( + name="context overflow test", + description="User asks for weather info", + agents=[ + VerboseAgent(), + MockUserSimulatorAgent(), + MockJudgeAgent(), + ], + script=[ + scenario.user(), + scenario.agent(), + scenario.proceed(), + ], + event_bus=event_bus, + ) + executor.events.subscribe(events.append) + + with pytest.raises(Exception): + await executor.run() + + finish_events = [e for e in events if isinstance(e, ScenarioRunFinishedEvent)] + assert len(finish_events) == 1, "Should emit finish event even on error" + + finish_event = finish_events[0] + assert finish_event.status.value == "ERROR" + results = finish_event.results + assert not isinstance(results, Unset) and results is not None + reasoning = results.reasoning + assert isinstance(reasoning, str) + assert "MockJudgeAgent" in reasoning diff --git a/python/tests/test_judge_span_collector.py b/python/tests/test_judge_span_collector.py new file mode 100644 index 00000000..d23ca9d4 --- /dev/null +++ b/python/tests/test_judge_span_collector.py @@ -0,0 +1,180 @@ +"""Tests for JudgeSpanCollector.""" + +import pytest +from unittest.mock import MagicMock +from scenario._tracing.judge_span_collector import JudgeSpanCollector + + +def create_mock_span( + *, + span_id: int, + name: str, + parent_span_id: int | None = None, + attributes: dict | None = None, +) -> MagicMock: + """Creates a mock ReadableSpan for testing.""" + span = MagicMock() + span.name = name + span.get_span_context.return_value.span_id = span_id + span.attributes = attributes or {} + + if parent_span_id is not None: + span.parent = MagicMock() + span.parent.span_id = parent_span_id + else: + span.parent = None + + return span + + +class TestJudgeSpanCollector: + """Tests for JudgeSpanCollector.""" + + def test_on_end_stores_span(self) -> None: + """on_end should store spans.""" + collector = JudgeSpanCollector() + span = create_mock_span(span_id=1, name="test") + + collector.on_end(span) + + assert len(collector._spans) == 1 + assert collector._spans[0] == span + + def test_on_start_is_noop(self) -> None: + """on_start should not store anything.""" + collector = JudgeSpanCollector() + span = create_mock_span(span_id=1, name="test") + + collector.on_start(span) + + assert len(collector._spans) == 0 + + def test_shutdown_clears_spans(self) -> None: + """shutdown should clear all stored spans.""" + collector = JudgeSpanCollector() + collector.on_end(create_mock_span(span_id=1, name="span1")) + collector.on_end(create_mock_span(span_id=2, name="span2")) + + collector.shutdown() + + assert len(collector._spans) == 0 + + def test_force_flush_returns_true(self) -> None: + """force_flush should return True.""" + collector = JudgeSpanCollector() + + result = collector.force_flush() + + assert result is True + + +class TestGetSpansForThread: + """Tests for get_spans_for_thread method.""" + + def test_returns_spans_with_matching_thread_id(self) -> None: + """Should return spans that have the matching thread ID.""" + collector = JudgeSpanCollector() + span1 = create_mock_span( + span_id=1, + name="span1", + attributes={"langwatch.thread.id": "thread-123"}, + ) + span2 = create_mock_span( + span_id=2, + name="span2", + attributes={"langwatch.thread.id": "thread-456"}, + ) + collector.on_end(span1) + collector.on_end(span2) + + result = collector.get_spans_for_thread("thread-123") + + assert len(result) == 1 + assert result[0].name == "span1" + + def test_returns_empty_list_when_no_matches(self) -> None: + """Should return empty list when no spans match thread ID.""" + collector = JudgeSpanCollector() + span = create_mock_span( + span_id=1, + name="span1", + attributes={"langwatch.thread.id": "thread-123"}, + ) + collector.on_end(span) + + result = collector.get_spans_for_thread("thread-999") + + assert len(result) == 0 + + def test_returns_child_spans_of_matching_parent(self) -> None: + """Should return child spans when parent has matching thread ID.""" + collector = JudgeSpanCollector() + parent = create_mock_span( + span_id=1, + name="parent", + attributes={"langwatch.thread.id": "thread-123"}, + ) + child = create_mock_span( + span_id=2, + name="child", + parent_span_id=1, + attributes={}, + ) + collector.on_end(parent) + collector.on_end(child) + + result = collector.get_spans_for_thread("thread-123") + + assert len(result) == 2 + names = [s.name for s in result] + assert "parent" in names + assert "child" in names + + def test_returns_deeply_nested_child_spans(self) -> None: + """Should return deeply nested spans when ancestor has matching thread ID.""" + collector = JudgeSpanCollector() + grandparent = create_mock_span( + span_id=1, + name="grandparent", + attributes={"langwatch.thread.id": "thread-123"}, + ) + parent = create_mock_span( + span_id=2, + name="parent", + parent_span_id=1, + attributes={}, + ) + child = create_mock_span( + span_id=3, + name="child", + parent_span_id=2, + attributes={}, + ) + collector.on_end(grandparent) + collector.on_end(parent) + collector.on_end(child) + + result = collector.get_spans_for_thread("thread-123") + + assert len(result) == 3 + + def test_excludes_unrelated_spans(self) -> None: + """Should not return spans from different thread hierarchies.""" + collector = JudgeSpanCollector() + span_a = create_mock_span( + span_id=1, + name="span_a", + attributes={"langwatch.thread.id": "thread-A"}, + ) + span_b = create_mock_span( + span_id=2, + name="span_b", + attributes={"langwatch.thread.id": "thread-B"}, + ) + collector.on_end(span_a) + collector.on_end(span_b) + + result = collector.get_spans_for_thread("thread-A") + + assert len(result) == 1 + assert result[0].name == "span_a" diff --git a/python/tests/test_judge_span_digest_formatter.py b/python/tests/test_judge_span_digest_formatter.py new file mode 100644 index 00000000..14938921 --- /dev/null +++ b/python/tests/test_judge_span_digest_formatter.py @@ -0,0 +1,352 @@ +"""Tests for JudgeSpanDigestFormatter.""" + +from typing import Any, cast +from unittest.mock import MagicMock + +import pytest +from opentelemetry.trace import StatusCode + +from scenario._judge.judge_span_digest_formatter import JudgeSpanDigestFormatter + + +def create_mock_span( + *, + span_id: int, + name: str, + start_time: int, + end_time: int, + parent_span_id: int | None = None, + attributes: dict | None = None, + events: list | None = None, + status_code: StatusCode = StatusCode.OK, + status_description: str | None = None, +) -> MagicMock: + """Creates a mock ReadableSpan for testing.""" + span = MagicMock() + span.name = name + span.start_time = start_time + span.end_time = end_time + span.get_span_context.return_value.span_id = span_id + span.attributes = attributes or {} + span.events = events or [] + span.status.status_code = status_code + span.status.description = status_description + + if parent_span_id is not None: + span.parent = MagicMock() + span.parent.span_id = parent_span_id + else: + span.parent = None + + return span + + +class TestJudgeSpanDigestFormatterEmpty: + """Tests for empty spans case.""" + + def test_returns_empty_marker_when_no_spans(self) -> None: + """Should return empty digest marker when no spans.""" + formatter = JudgeSpanDigestFormatter() + result = formatter.format([]) + assert result == "No spans recorded." + + +class TestJudgeSpanDigestFormatterSingleSpan: + """Tests for single span formatting.""" + + def test_includes_span_name_and_duration(self) -> None: + """Should include span name and duration.""" + formatter = JudgeSpanDigestFormatter() + # 100ms duration (in nanoseconds) + span = create_mock_span( + span_id=1, + name="llm.chat", + start_time=1700000000_000_000_000, + end_time=1700000000_100_000_000, + ) + + result = formatter.format([span]) + + assert "llm.chat" in result + assert "100ms" in result + assert "Spans: 1" in result + + def test_includes_attributes(self) -> None: + """Should include span attributes.""" + formatter = JudgeSpanDigestFormatter() + span = create_mock_span( + span_id=1, + name="test", + start_time=1700000000_000_000_000, + end_time=1700000000_100_000_000, + attributes={ + "gen_ai.prompt": "Hello", + "gen_ai.completion": "Hi there!", + "model": "gpt-4", + }, + ) + + result = formatter.format([span]) + + assert "gen_ai.prompt: Hello" in result + assert "gen_ai.completion: Hi there!" in result + assert "model: gpt-4" in result + + +class TestJudgeSpanDigestFormatterMultipleSpans: + """Tests for multiple spans formatting.""" + + def test_orders_spans_by_start_time(self) -> None: + """Should order spans by start time.""" + formatter = JudgeSpanDigestFormatter() + spans = [ + create_mock_span( + span_id=2, + name="second", + start_time=1700000001_000_000_000, + end_time=1700000001_100_000_000, + ), + create_mock_span( + span_id=1, + name="first", + start_time=1700000000_000_000_000, + end_time=1700000000_100_000_000, + ), + ] + + result = formatter.format(spans) + + first_idx = result.index("first") + second_idx = result.index("second") + assert first_idx < second_idx + + def test_assigns_sequence_numbers(self) -> None: + """Should assign sequence numbers to spans.""" + formatter = JudgeSpanDigestFormatter() + spans = [ + create_mock_span( + span_id=1, + name="first", + start_time=1700000000_000_000_000, + end_time=1700000000_100_000_000, + ), + create_mock_span( + span_id=2, + name="second", + start_time=1700000001_000_000_000, + end_time=1700000001_100_000_000, + ), + ] + + result = formatter.format(spans) + + assert "[1]" in result + assert "[2]" in result + + +class TestJudgeSpanDigestFormatterHierarchy: + """Tests for parent-child span hierarchy.""" + + def test_nests_children_under_parent(self) -> None: + """Should nest child spans under parent.""" + formatter = JudgeSpanDigestFormatter() + spans = [ + create_mock_span( + span_id=1, + name="parent", + start_time=1700000000_000_000_000, + end_time=1700000001_000_000_000, + ), + create_mock_span( + span_id=2, + name="child", + parent_span_id=1, + start_time=1700000000_100_000_000, + end_time=1700000000_500_000_000, + ), + ] + + result = formatter.format(spans) + + # Child should appear after parent with tree prefix + parent_idx = result.index("parent") + child_idx = result.index("child") + assert parent_idx < child_idx + # Should have tree drawing characters + assert "├──" in result or "└──" in result + + +class TestJudgeSpanDigestFormatterErrors: + """Tests for error span formatting.""" + + def test_marks_error_spans(self) -> None: + """Should mark spans with error status.""" + formatter = JudgeSpanDigestFormatter() + span = create_mock_span( + span_id=1, + name="failed.operation", + start_time=1700000000_000_000_000, + end_time=1700000000_100_000_000, + status_code=StatusCode.ERROR, + status_description="Connection refused", + ) + + result = formatter.format([span]) + + assert "ERROR" in result + assert "Connection refused" in result + + def test_collects_errors_in_summary(self) -> None: + """Should collect errors in summary section.""" + formatter = JudgeSpanDigestFormatter() + spans = [ + create_mock_span( + span_id=1, + name="successful", + start_time=1700000000_000_000_000, + end_time=1700000000_100_000_000, + ), + create_mock_span( + span_id=2, + name="failed", + start_time=1700000000_200_000_000, + end_time=1700000000_300_000_000, + status_code=StatusCode.ERROR, + status_description="Error message", + ), + ] + + result = formatter.format(spans) + + assert "=== ERRORS ===" in result + assert "failed: Error message" in result + + +class TestJudgeSpanDigestFormatterEvents: + """Tests for span events formatting.""" + + def test_renders_events(self) -> None: + """Should render span events.""" + formatter = JudgeSpanDigestFormatter() + event = MagicMock() + event.name = "token.generated" + event.attributes = {"token": "Hello", "index": 0} + + span = create_mock_span( + span_id=1, + name="llm.stream", + start_time=1700000000_000_000_000, + end_time=1700000001_000_000_000, + events=[event], + ) + + result = formatter.format([span]) + + assert "[event] token.generated" in result + assert "token: Hello" in result + + +class TestJudgeSpanDigestFormatterFiltering: + """Tests for attribute filtering.""" + + def test_excludes_internal_attributes(self) -> None: + """Should exclude thread.id, scenario.id, scenario.name.""" + formatter = JudgeSpanDigestFormatter() + span = create_mock_span( + span_id=1, + name="test", + start_time=1700000000_000_000_000, + end_time=1700000000_100_000_000, + attributes={ + "langwatch.thread.id": "thread-123", + "langwatch.scenario.id": "scenario-456", + "langwatch.scenario.name": "my-scenario", + "relevant.attribute": "should-appear", + }, + ) + + result = formatter.format([span]) + + assert "thread-123" not in result + assert "scenario-456" not in result + assert "my-scenario" not in result + assert "relevant.attribute: should-appear" in result + + +class TestJudgeSpanDigestFormatterDeduplication: + """Tests for string deduplication.""" + + def test_deduplicates_long_strings(self) -> None: + """Should deduplicate long repeated strings.""" + formatter = JudgeSpanDigestFormatter() + long_content = "This is a long string that exceeds the threshold for deduplication testing purposes." + spans = [ + create_mock_span( + span_id=1, + name="first", + start_time=1700000000_000_000_000, + end_time=1700000000_100_000_000, + attributes={"content": long_content}, + ), + create_mock_span( + span_id=2, + name="second", + start_time=1700000000_200_000_000, + end_time=1700000000_300_000_000, + attributes={"content": long_content}, + ), + ] + + result = formatter.format(spans) + + assert long_content in result + assert "[DUPLICATE - SEE ABOVE]" in result + + def test_does_not_deduplicate_short_strings(self) -> None: + """Should not deduplicate short strings.""" + formatter = JudgeSpanDigestFormatter() + short_content = "Short" + spans = [ + create_mock_span( + span_id=1, + name="first", + start_time=1700000000_000_000_000, + end_time=1700000000_100_000_000, + attributes={"content": short_content}, + ), + create_mock_span( + span_id=2, + name="second", + start_time=1700000000_200_000_000, + end_time=1700000000_300_000_000, + attributes={"content": short_content}, + ), + ] + + result = formatter.format(spans) + + # Short content should appear twice + assert result.count(short_content) == 2 + assert "[DUPLICATE - SEE ABOVE]" not in result + + def test_resets_deduplication_between_calls(self) -> None: + """Should reset deduplication state between format calls.""" + formatter = JudgeSpanDigestFormatter() + long_content = ( + "This content appears in both calls but should show fully each time." + ) + span = create_mock_span( + span_id=1, + name="test", + start_time=1700000000_000_000_000, + end_time=1700000000_100_000_000, + attributes={"content": long_content}, + ) + + result1 = formatter.format([span]) + result2 = formatter.format([span]) + + assert long_content in result1 + assert long_content in result2 + assert "[DUPLICATE - SEE ABOVE]" not in result1 + assert "[DUPLICATE - SEE ABOVE]" not in result2 diff --git a/python/tests/test_judge_utilities.py b/python/tests/test_judge_utilities.py new file mode 100644 index 00000000..404ffcc1 --- /dev/null +++ b/python/tests/test_judge_utilities.py @@ -0,0 +1,225 @@ +"""Tests for judge utility modules.""" + +from typing import Any, cast + +import pytest + +from scenario._judge.deep_transform import deep_transform +from scenario._judge.judge_utils import JudgeUtils +from scenario._judge.string_deduplicator import StringDeduplicator +from scenario._judge.truncate_media import truncate_media_part, truncate_media_url + + +class TestDeepTransform: + """Tests for deep_transform function.""" + + def test_transforms_simple_value(self) -> None: + """Should transform a simple value.""" + result = deep_transform( + "hello", lambda v: v.upper() if isinstance(v, str) else v + ) + assert result == "HELLO" + + def test_transforms_list_elements(self) -> None: + """Should recursively transform list elements.""" + result = deep_transform( + ["a", "b", "c"], + lambda v: v.upper() if isinstance(v, str) else v, + ) + assert result == ["A", "B", "C"] + + def test_transforms_dict_values(self) -> None: + """Should recursively transform dict values.""" + result = deep_transform( + {"key": "value"}, + lambda v: v.upper() if isinstance(v, str) else v, + ) + assert result == {"key": "VALUE"} + + def test_stops_recursion_when_fn_returns_different_value(self) -> None: + """Should stop recursion when fn returns a different value.""" + result = deep_transform( + {"nested": {"deep": "value"}}, + lambda v: "REPLACED" if isinstance(v, dict) and "deep" in v else v, + ) + assert result == {"nested": "REPLACED"} + + def test_handles_nested_structures(self) -> None: + """Should handle deeply nested structures.""" + data = {"level1": {"level2": [{"level3": "value"}]}} + result = deep_transform( + data, + lambda v: v.upper() if isinstance(v, str) else v, + ) + assert result == {"level1": {"level2": [{"level3": "VALUE"}]}} + + +class TestStringDeduplicator: + """Tests for StringDeduplicator class.""" + + def test_returns_original_for_first_occurrence(self) -> None: + """Should return original string on first occurrence.""" + dedup = StringDeduplicator(threshold=10) + result = dedup.process("This is a long string for testing") + assert result == "This is a long string for testing" + + def test_returns_marker_for_duplicate(self) -> None: + """Should return marker for duplicate strings.""" + dedup = StringDeduplicator(threshold=10) + original = "This is a long string for testing" + dedup.process(original) + result = dedup.process(original) + assert result == "[DUPLICATE - SEE ABOVE]" + + def test_ignores_strings_below_threshold(self) -> None: + """Should not deduplicate strings below threshold.""" + dedup = StringDeduplicator(threshold=50) + short = "Short" + assert dedup.process(short) == "Short" + assert dedup.process(short) == "Short" # Not marked as duplicate + + def test_normalizes_whitespace_for_comparison(self) -> None: + """Should treat strings with different whitespace as duplicates.""" + dedup = StringDeduplicator(threshold=10) + dedup.process("Line one\nLine two\nLine three") + result = dedup.process("Line one\n\nLine two\n Line three") + assert result == "[DUPLICATE - SEE ABOVE]" + + def test_reset_clears_seen_strings(self) -> None: + """Should clear seen strings on reset.""" + dedup = StringDeduplicator(threshold=10) + original = "This is a long string for testing" + dedup.process(original) + dedup.reset() + result = dedup.process(original) + assert result == original # Not marked as duplicate + + +class TestTruncateMediaUrl: + """Tests for truncate_media_url function.""" + + def test_truncates_image_data_url(self) -> None: + """Should truncate image data URLs.""" + data_url = "data:image/png;base64," + "A" * 1000 + result = truncate_media_url(data_url) + assert result == "[IMAGE: image/png, ~1000 bytes]" + + def test_truncates_audio_data_url(self) -> None: + """Should truncate audio data URLs.""" + data_url = "data:audio/wav;base64," + "B" * 500 + result = truncate_media_url(data_url) + assert result == "[AUDIO: audio/wav, ~500 bytes]" + + def test_truncates_video_data_url(self) -> None: + """Should truncate video data URLs.""" + data_url = "data:video/mp4;base64," + "C" * 2000 + result = truncate_media_url(data_url) + assert result == "[VIDEO: video/mp4, ~2000 bytes]" + + def test_returns_non_data_url_unchanged(self) -> None: + """Should return non-data URLs unchanged.""" + url = "https://example.com/image.png" + result = truncate_media_url(url) + assert result == url + + def test_returns_regular_string_unchanged(self) -> None: + """Should return regular strings unchanged.""" + text = "Hello, world!" + result = truncate_media_url(text) + assert result == text + + +class TestTruncateMediaPart: + """Tests for truncate_media_part function.""" + + def test_truncates_file_part(self) -> None: + """Should truncate AI SDK file parts.""" + part = { + "type": "file", + "mediaType": "audio/wav", + "data": "A" * 1000, + } + result = truncate_media_part(part) + assert result is not None + assert result["type"] == "file" + assert result["mediaType"] == "audio/wav" + assert result["data"] == "[AUDIO: audio/wav, ~1000 bytes]" + + def test_truncates_image_part_with_data_url(self) -> None: + """Should truncate AI SDK image parts with data URLs.""" + part = { + "type": "image", + "image": "data:image/png;base64," + "B" * 500, + } + result = truncate_media_part(part) + assert result is not None + assert result["image"] == "[IMAGE: image/png, ~500 bytes]" + + def test_truncates_image_part_with_raw_base64(self) -> None: + """Should truncate AI SDK image parts with raw base64.""" + part = { + "type": "image", + "image": "A" * 2000, # Long base64-like string + } + result = truncate_media_part(part) + assert result is not None + assert "[IMAGE: unknown" in result["image"] + + def test_returns_none_for_non_media_dict(self) -> None: + """Should return None for non-media dicts.""" + part = {"type": "text", "content": "Hello"} + result = truncate_media_part(part) + assert result is None + + def test_returns_none_for_non_dict(self) -> None: + """Should return None for non-dict values.""" + assert truncate_media_part("string") is None + assert truncate_media_part(123) is None + assert truncate_media_part(None) is None + assert truncate_media_part([1, 2, 3]) is None + + +class TestJudgeUtils: + """Tests for JudgeUtils class.""" + + def test_build_transcript_basic(self) -> None: + """Should build transcript from messages.""" + messages = [ + {"role": "user", "content": "Hello"}, + {"role": "assistant", "content": "Hi there!"}, + ] + result = JudgeUtils.build_transcript_from_messages(cast(Any, messages)) + assert 'user: "Hello"' in result + assert 'assistant: "Hi there!"' in result + + def test_build_transcript_truncates_media(self) -> None: + """Should truncate base64 media in transcript.""" + messages = [ + { + "role": "user", + "content": "data:image/png;base64," + "A" * 1000, + }, + ] + result = JudgeUtils.build_transcript_from_messages(cast(Any, messages)) + assert "[IMAGE: image/png" in result + assert "A" * 100 not in result # Base64 should be truncated + + def test_build_transcript_handles_empty_messages(self) -> None: + """Should handle empty message list.""" + result = JudgeUtils.build_transcript_from_messages([]) + assert result == "" + + def test_build_transcript_handles_complex_content(self) -> None: + """Should handle complex message content.""" + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": "Describe this image"}, + {"type": "image", "image": "data:image/png;base64," + "B" * 500}, + ], + }, + ] + result = JudgeUtils.build_transcript_from_messages(cast(Any, messages)) + assert "Describe this image" in result + assert "[IMAGE:" in result diff --git a/python/tests/test_scenario_executor_events.py b/python/tests/test_scenario_executor_events.py index b69d6266..5da71bd8 100644 --- a/python/tests/test_scenario_executor_events.py +++ b/python/tests/test_scenario_executor_events.py @@ -2,6 +2,7 @@ from typing import List, Tuple, Dict, Any from scenario import JudgeAgent, UserSimulatorAgent +from scenario._generated.langwatch_api_client.lang_watch_api_client.types import Unset from scenario.agent_adapter import AgentAdapter from scenario.types import AgentInput, ScenarioResult from scenario.scenario_executor import ScenarioExecutor @@ -210,3 +211,72 @@ async def test_event_ordering(executed_events: ExecutedEventsFixture) -> None: assert start_event.timestamp <= snapshot_events[0].timestamp assert snapshot_events[-1].timestamp <= finish_event.timestamp assert start_event.timestamp <= finish_event.timestamp + + +class FailingAgent(AgentAdapter): + """Agent that raises an exception.""" + + async def call(self, input: AgentInput) -> str: + raise RuntimeError("Simulated agent failure") + + +@pytest.mark.asyncio +async def test_emits_error_event_on_exception() -> None: + """Should emit ScenarioRunFinishedEvent with ERROR status when agent throws.""" + mock_reporter = MockEventReporter() + event_bus = ScenarioEventBus(event_reporter=mock_reporter) + + executor = ScenarioExecutor( + name="error scenario", + description="test error handling", + agents=[ + FailingAgent(), + MockUserSimulatorAgent(model="none"), + MockJudgeAgent(model="none", criteria=["test"]), + ], + event_bus=event_bus, + ) + + events: List[ScenarioEvent] = [] + executor.events.subscribe(events.append) + + with pytest.raises(RuntimeError, match="Simulated agent failure"): + await executor.run() + + # Verify we still got the finish event with ERROR status + finish_events = [e for e in events if isinstance(e, ScenarioRunFinishedEvent)] + assert len(finish_events) == 1, "Should emit finish event even on error" + + finish_event = finish_events[0] + assert finish_event.status.value == "ERROR" + results = finish_event.results + assert not isinstance(results, Unset) and results is not None + reasoning = results.reasoning + assert isinstance(reasoning, str) + assert "Simulated agent failure" in reasoning + + +@pytest.mark.asyncio +async def test_error_includes_agent_class_name() -> None: + """Error message should identify which agent threw the exception.""" + import scenario + + executor = ScenarioExecutor( + name="agent error tagging", + description="test agent identification in errors", + agents=[ + FailingAgent(), + MockUserSimulatorAgent(model="none"), + MockJudgeAgent(model="none", criteria=["test"]), + ], + script=[ + scenario.user(), + scenario.agent(), # This will call FailingAgent and throw + ], + ) + + with pytest.raises(RuntimeError) as exc_info: + await executor.run() + + # Error should include the agent class name + assert "FailingAgent" in str(exc_info.value) diff --git a/python/uv.lock b/python/uv.lock index 94eb308f..27048589 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -1049,7 +1049,7 @@ wheels = [ [[package]] name = "langwatch-scenario" -version = "0.7.13" +version = "0.7.14" source = { editable = "." } dependencies = [ { name = "httpx" }, @@ -1057,11 +1057,13 @@ dependencies = [ { name = "langwatch" }, { name = "litellm" }, { name = "openai" }, + { name = "opentelemetry-sdk" }, { name = "pksuid" }, { name = "pydantic" }, { name = "pydantic-settings" }, { name = "pytest" }, { name = "pytest-asyncio" }, + { name = "pytest-rerunfailures" }, { name = "python-dateutil" }, { name = "python-dotenv" }, { name = "rich" }, @@ -1104,6 +1106,7 @@ requires-dist = [ { name = "langwatch", specifier = ">=0.2.19" }, { name = "litellm", specifier = ">=1.49.0" }, { name = "openai", specifier = ">=1.88.0" }, + { name = "opentelemetry-sdk", specifier = ">=1.20.0" }, { name = "pdoc3", marker = "extra == 'dev'" }, { name = "pksuid", specifier = ">=1.1.2" }, { name = "pre-commit", marker = "extra == 'dev'" }, @@ -1114,6 +1117,7 @@ requires-dist = [ { name = "pytest", specifier = ">=8.1.1" }, { name = "pytest-asyncio", specifier = ">=0.26.0" }, { name = "pytest-cov", marker = "extra == 'dev'" }, + { name = "pytest-rerunfailures", specifier = ">=14.0" }, { name = "python-dateutil", specifier = ">=2.9.0.post0" }, { name = "python-dotenv", specifier = ">=1.0.1" }, { name = "respx", marker = "extra == 'dev'" }, @@ -2177,6 +2181,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/bc/16/4ea354101abb1287856baa4af2732be351c7bee728065aed451b678153fd/pytest_cov-6.2.1-py3-none-any.whl", hash = "sha256:f5bc4c23f42f1cdd23c70b1dab1bbaef4fc505ba950d53e0081d0730dd7e86d5", size = 24644, upload-time = "2025-06-12T10:47:45.932Z" }, ] +[[package]] +name = "pytest-rerunfailures" +version = "16.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "packaging" }, + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/de/04/71e9520551fc8fe2cf5c1a1842e4e600265b0815f2016b7c27ec85688682/pytest_rerunfailures-16.1.tar.gz", hash = "sha256:c38b266db8a808953ebd71ac25c381cb1981a78ff9340a14bcb9f1b9bff1899e", size = 30889, upload-time = "2025-10-10T07:06:01.238Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/77/54/60eabb34445e3db3d3d874dc1dfa72751bfec3265bd611cb13c8b290adea/pytest_rerunfailures-16.1-py3-none-any.whl", hash = "sha256:5d11b12c0ca9a1665b5054052fcc1084f8deadd9328962745ef6b04e26382e86", size = 14093, upload-time = "2025-10-10T07:06:00.019Z" }, +] + [[package]] name = "python-dateutil" version = "2.9.0.post0" diff --git a/scripts/ci-wait.sh b/scripts/ci-wait.sh new file mode 100755 index 00000000..2ca736ae --- /dev/null +++ b/scripts/ci-wait.sh @@ -0,0 +1,53 @@ +#!/usr/bin/env bash +# +# Polls GitHub Actions workflow status until all complete or timeout. +# Outputs JSON result for the agent to analyze. +# +# Usage: ./scripts/ci-wait.sh [branch] [timeout_seconds] [interval_seconds] +# + +set -euo pipefail + +BRANCH="${1:-$(git branch --show-current)}" +TIMEOUT="${2:-600}" +INTERVAL="${3:-30}" +ELAPSED=0 + +echo "Waiting for CI workflows on branch: $BRANCH" +echo "Timeout: ${TIMEOUT}s, Poll interval: ${INTERVAL}s" +echo "" + +while [ $ELAPSED -lt $TIMEOUT ]; do + RAW_RESULT=$(gh run list --branch "$BRANCH" --limit 20 --json status,conclusion,name,databaseId,workflowName,createdAt) + + # Get only the most recent run per workflow (dedupe by workflowName) + RESULT=$(echo "$RAW_RESULT" | jq '[group_by(.workflowName) | .[] | sort_by(.createdAt) | reverse | .[0]]') + + # Check if any still running + PENDING=$(echo "$RESULT" | jq '[.[] | select(.status == "in_progress" or .status == "queued")] | length') + + if [ "$PENDING" -eq 0 ]; then + # Check for failures + FAILED=$(echo "$RESULT" | jq '[.[] | select(.conclusion == "failure")] | length') + + if [ "$FAILED" -gt 0 ]; then + echo "CI FAILED" + echo "" + echo "$RESULT" | jq '.[] | select(.conclusion == "failure") | {name, workflowName, databaseId}' + exit 1 + else + echo "CI PASSED" + echo "" + echo "$RESULT" | jq '.[] | {name, workflowName, conclusion}' + exit 0 + fi + fi + + echo "[$ELAPSED/${TIMEOUT}s] $PENDING workflow(s) still running..." + sleep "$INTERVAL" + ELAPSED=$((ELAPSED + INTERVAL)) +done + +echo "TIMEOUT after ${TIMEOUT}s" +echo "$RESULT" | jq '.[] | {name, status, conclusion}' +exit 2 diff --git a/specs/context-window-exceeded.feature b/specs/context-window-exceeded.feature new file mode 100644 index 00000000..e4e86a03 --- /dev/null +++ b/specs/context-window-exceeded.feature @@ -0,0 +1,26 @@ +@integration +Feature: Context window exceeded error handling + As a scenario test author + I want clear error messages when context limits are exceeded + So that I can identify which agent caused the failure + + Background: + Given a scenario with an agent that returns a large response + + @integration + Scenario: Error identifies the agent that exceeded context + Given a VerboseAgent that returns 200k characters + And a JudgeAgent with a 16k context model + When the scenario runs + Then it should raise a context window error + And the error message should contain "JudgeAgent" + And the error message should mention "token" or "context" + + @integration + Scenario: Reports are still sent when context is exceeded + Given a VerboseAgent that returns 200k characters + And a JudgeAgent with a 16k context model + When the scenario runs and fails + Then a ScenarioRunFinishedEvent should be emitted + And the event status should be "ERROR" + And the event reasoning should contain the error message diff --git a/specs/span-based-evaluation.feature b/specs/span-based-evaluation.feature new file mode 100644 index 00000000..333a6037 --- /dev/null +++ b/specs/span-based-evaluation.feature @@ -0,0 +1,56 @@ +@python +Feature: Span-based evaluation for judge agent + As a scenario test author + I want the judge to evaluate OpenTelemetry spans from my agent + So that I can verify internal behavior beyond just the conversation + + Background: + Given an agent instrumented with OpenTelemetry spans + And a JudgeAgent with span-based criteria + + @e2e + Scenario: Judge evaluates spans from LangWatch decorators + Given an agent using @langwatch.span() decorators + And spans have langwatch.thread.id set to the scenario thread + When the scenario runs + Then the judge can see HTTP call spans + And the judge can see database query spans + And the judge can see tool execution spans + And the judge can evaluate criteria based on span content + + @e2e + Scenario: Judge evaluates spans from native OpenTelemetry API + Given an agent using native OpenTelemetry context managers + And spans have langwatch.thread.id set to the scenario thread + When the scenario runs + Then the judge can evaluate criteria based on span content + + @unit + Scenario: Span collector captures spans by thread ID + Given spans with different thread IDs + When get_spans_for_thread is called + Then only spans matching the thread ID are returned + And child spans inherit thread ID from parents + + @unit + Scenario: Span digest formatter produces readable output + Given a list of OpenTelemetry spans + When formatted into a digest + Then output includes span names and durations + And output includes relevant attributes + And duplicate content is deduplicated + And media content is truncated + + @unit + Scenario: Media truncation reduces token usage + Given span attributes containing base64 data URLs + When processed by truncate_media + Then data URLs are replaced with readable markers + And markers include mime type and approximate size + + @unit + Scenario: String deduplication reduces token usage + Given repeated long strings across spans + When processed by the deduplicator + Then first occurrence is kept + And subsequent occurrences are replaced with markers