diff --git a/javascript/src/domain/core/execution.ts b/javascript/src/domain/core/execution.ts index 457f9660..516a0c09 100644 --- a/javascript/src/domain/core/execution.ts +++ b/javascript/src/domain/core/execution.ts @@ -11,6 +11,11 @@ import type { ScenarioConfig } from "../scenarios"; * */ export interface ScenarioResult { + /** + * Unique identifier for this scenario run. + */ + runId: string; + /** * Indicates whether the scenario was successful. */ diff --git a/javascript/src/execution/scenario-execution.ts b/javascript/src/execution/scenario-execution.ts index 310a03c4..2d6a59a4 100644 --- a/javascript/src/execution/scenario-execution.ts +++ b/javascript/src/execution/scenario-execution.ts @@ -37,7 +37,6 @@ import { generateScenarioId, generateScenarioRunId, generateThreadId, - getBatchRunId, } from "../utils/ids"; import { Logger } from "../utils/logger"; @@ -183,13 +182,21 @@ export class ScenarioExecution implements ScenarioExecutionLike { public readonly events$: Observable = this.eventSubject.asObservable(); + /** Batch run ID for grouping scenario runs */ + private batchRunId: string; + + /** The run ID for the current execution */ + private scenarioRunId?: string; + /** * Creates a new ScenarioExecution instance. * * @param config - The scenario configuration containing agents, settings, and metadata * @param script - The ordered sequence of script steps that define the test flow + * @param batchRunId - Batch run ID for grouping scenario runs */ - constructor(config: ScenarioConfig, script: ScriptStep[]) { + constructor(config: ScenarioConfig, script: ScriptStep[], batchRunId: string) { + this.batchRunId = batchRunId; this.config = { id: config.id ?? generateScenarioId(), name: config.name, @@ -243,8 +250,12 @@ export class ScenarioExecution implements ScenarioExecutionLike { * @param result - The final scenario result (without messages/timing, which will be added automatically) */ private setResult( - result: Omit + result: Omit ): void { + if (!this.scenarioRunId) { + throw new Error("Cannot set result: scenarioRunId has not been initialized. This is a bug in ScenarioExecution."); + } + const agentRoleAgentsIdx = this.agents .map((agent, i) => ({ agent, idx: i })) .filter(({ agent }) => agent.role === AgentRole.AGENT) @@ -257,6 +268,7 @@ export class ScenarioExecution implements ScenarioExecutionLike { const totalAgentTime = agentTimes.reduce((sum, time) => sum + time, 0); this._result = { + runId: this.scenarioRunId, ...result, messages: this.state.messages, totalTime: this.totalTime, @@ -315,6 +327,7 @@ export class ScenarioExecution implements ScenarioExecutionLike { this.reset(); const scenarioRunId = generateScenarioRunId(); + this.scenarioRunId = scenarioRunId; this.logger.debug(`[${this.config.id}] Generated run ID: ${scenarioRunId}`); this.emitRunStarted({ scenarioRunId }); @@ -1212,7 +1225,7 @@ export class ScenarioExecution implements ScenarioExecutionLike { return { type: "placeholder", // This will be replaced by the specific event type timestamp: Date.now(), - batchRunId: getBatchRunId(), + batchRunId: this.batchRunId, scenarioId: this.config.id, scenarioRunId, scenarioSetId: this.config.setId, diff --git a/javascript/src/runner/__tests__/run.test.ts b/javascript/src/runner/__tests__/run.test.ts new file mode 100644 index 00000000..d49fff57 --- /dev/null +++ b/javascript/src/runner/__tests__/run.test.ts @@ -0,0 +1,353 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { run, type RunOptions } from "../run"; +import { AgentRole, type AgentAdapter, type AgentInput } from "../../domain"; +import type { ScenarioEvent } from "../../events/schema"; + +// Mock the EventBus - must use function keyword for constructor +vi.mock("../../events/event-bus", () => ({ + EventBus: vi.fn().mockImplementation(function (this: unknown, config: unknown) { + return { + config, + listen: vi.fn(), + subscribeTo: vi.fn().mockReturnValue({ unsubscribe: vi.fn() }), + drain: vi.fn().mockResolvedValue(undefined), + }; + }), +})); + +// Mock the tracing setup +vi.mock("../../tracing/setup", () => ({ + observabilityHandle: undefined, +})); + +// Mock getLangWatchTracer +vi.mock("langwatch", () => ({ + getLangWatchTracer: vi.fn().mockReturnValue({ + startSpan: vi.fn().mockReturnValue({ + end: vi.fn(), + spanContext: vi.fn().mockReturnValue({ traceId: "test-trace-id" }), + }), + withActiveSpan: vi.fn().mockImplementation(async (_name, _opts, _ctx, fn) => { + const mockSpan = { + setType: vi.fn(), + setInput: vi.fn(), + setOutput: vi.fn(), + setMetrics: vi.fn(), + spanContext: vi.fn().mockReturnValue({ traceId: "test-trace-id" }), + }; + return fn(mockSpan); + }), + }), +})); + +// Create a simple test agent that immediately succeeds +class TestAgent implements AgentAdapter { + name = "TestAgent"; + role = AgentRole.AGENT; + + async call(_input: AgentInput): Promise { + return "Test response"; + } +} + +// Create a simple judge that always succeeds +class TestJudgeAgent implements AgentAdapter { + name = "TestJudge"; + role = AgentRole.JUDGE; + + async call(_input: AgentInput) { + return { + success: true, + reasoning: "Test passed", + metCriteria: ["criterion1"], + unmetCriteria: [], + }; + } +} + +describe("run", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + describe("runId", () => { + it("should include runId in the result", async () => { + const result = await run({ + name: "Test Scenario", + description: "A test scenario", + agents: [new TestAgent(), new TestJudgeAgent()], + script: [ + async (_state, executor) => { + await executor.succeed("Immediate success"); + }, + ], + }); + + expect(result.runId).toBeDefined(); + expect(result.runId).toMatch(/^scenariorun_/); + }); + + it("should generate unique runIds for each run", async () => { + const config = { + name: "Test Scenario", + description: "A test scenario", + agents: [new TestAgent(), new TestJudgeAgent()], + script: [ + async (_state: unknown, executor: { succeed: (msg: string) => Promise }) => { + await executor.succeed("Immediate success"); + }, + ], + }; + + const result1 = await run(config); + const result2 = await run(config); + + expect(result1.runId).not.toBe(result2.runId); + }); + }); + + describe("batchRunId", () => { + it("should use provided batchRunId from options", async () => { + const { EventBus } = await import("../../events/event-bus"); + const capturedEvents: ScenarioEvent[] = []; + + // Override the mock to capture events - must use function keyword for constructor + vi.mocked(EventBus).mockImplementation(function (this: unknown, config: unknown) { + return { + config, + listen: vi.fn(), + subscribeTo: vi.fn().mockImplementation((events$) => { + const subscription = events$.subscribe((event: ScenarioEvent) => { + capturedEvents.push(event); + }); + return subscription; + }), + drain: vi.fn().mockResolvedValue(undefined), + }; + }); + + await run( + { + name: "Test Scenario", + description: "A test scenario", + agents: [new TestAgent(), new TestJudgeAgent()], + script: [ + async (_state: unknown, executor: { succeed: (msg: string) => Promise }) => { + await executor.succeed("Immediate success"); + }, + ], + }, + { + batchRunId: "custom_batch_123", + } + ); + + // Check that events contain the custom batchRunId + const runStartedEvent = capturedEvents.find((e) => e.type === "SCENARIO_RUN_STARTED"); + expect(runStartedEvent?.batchRunId).toBe("custom_batch_123"); + }); + + it("should auto-generate batchRunId when not provided", async () => { + const { EventBus } = await import("../../events/event-bus"); + const capturedEvents: ScenarioEvent[] = []; + + vi.mocked(EventBus).mockImplementation(function (this: unknown, config: unknown) { + return { + config, + listen: vi.fn(), + subscribeTo: vi.fn().mockImplementation((events$) => { + const subscription = events$.subscribe((event: ScenarioEvent) => { + capturedEvents.push(event); + }); + return subscription; + }), + drain: vi.fn().mockResolvedValue(undefined), + }; + }); + + await run({ + name: "Test Scenario", + description: "A test scenario", + agents: [new TestAgent(), new TestJudgeAgent()], + script: [ + async (_state: unknown, executor: { succeed: (msg: string) => Promise }) => { + await executor.succeed("Immediate success"); + }, + ], + }); + + const runStartedEvent = capturedEvents.find((e) => e.type === "SCENARIO_RUN_STARTED"); + expect(runStartedEvent?.batchRunId).toBeDefined(); + expect(runStartedEvent?.batchRunId).toMatch(/^scenariobatch_/); + }); + }); + + describe("langwatch config", () => { + it("should use provided langwatch config for EventBus", async () => { + const { EventBus } = await import("../../events/event-bus"); + + const options: RunOptions = { + langwatch: { + endpoint: "https://custom.endpoint.com", + apiKey: "custom-api-key", + }, + }; + + await run( + { + name: "Test Scenario", + description: "A test scenario", + agents: [new TestAgent(), new TestJudgeAgent()], + script: [ + async (_state: unknown, executor: { succeed: (msg: string) => Promise }) => { + await executor.succeed("Immediate success"); + }, + ], + }, + options + ); + + expect(EventBus).toHaveBeenCalledWith({ + endpoint: "https://custom.endpoint.com", + apiKey: "custom-api-key", + }); + }); + }); + + /** + * Concurrency Safety Tests + * + * These tests verify that concurrent scenario runs are properly isolated. + * Each run() call creates its own EventBus instance with its own config, + * ensuring that events from different projects don't get mixed up. + * + * This is critical because: + * 1. Multiple users may run scenarios simultaneously + * 2. Each project has its own API key for authentication + * 3. Events must be routed to the correct project based on API key + * + * The old implementation used environment variables (process-wide state) + * which required a mutex to prevent concurrent runs from interfering. + * The new implementation uses per-call programmatic config, eliminating + * the need for a mutex and enabling true concurrency. + */ + describe("concurrency safety", () => { + it("should create separate EventBus instances for concurrent runs", async () => { + const { EventBus } = await import("../../events/event-bus"); + const eventBusConfigs: Array<{ endpoint: string; apiKey: string }> = []; + + // Track all EventBus instantiations + vi.mocked(EventBus).mockImplementation(function (this: unknown, config: { endpoint: string; apiKey: string }) { + eventBusConfigs.push(config); + return { + config, + listen: vi.fn(), + subscribeTo: vi.fn().mockReturnValue({ unsubscribe: vi.fn() }), + drain: vi.fn().mockResolvedValue(undefined), + }; + }); + + const createScenarioConfig = () => ({ + name: "Test Scenario", + description: "A test scenario", + agents: [new TestAgent(), new TestJudgeAgent()], + script: [ + async (_state: unknown, executor: { succeed: (msg: string) => Promise }) => { + await executor.succeed("Immediate success"); + }, + ], + }); + + // Run three scenarios concurrently with different API keys + await Promise.all([ + run(createScenarioConfig(), { + langwatch: { endpoint: "https://api.example.com", apiKey: "project-a-key" }, + }), + run(createScenarioConfig(), { + langwatch: { endpoint: "https://api.example.com", apiKey: "project-b-key" }, + }), + run(createScenarioConfig(), { + langwatch: { endpoint: "https://api.example.com", apiKey: "project-c-key" }, + }), + ]); + + // Verify three separate EventBus instances were created + expect(EventBus).toHaveBeenCalledTimes(3); + + // Verify each instance has the correct, unique API key + const apiKeys = eventBusConfigs.map((c) => c.apiKey); + expect(apiKeys).toContain("project-a-key"); + expect(apiKeys).toContain("project-b-key"); + expect(apiKeys).toContain("project-c-key"); + + // Verify no API keys were mixed up (each appears exactly once) + expect(apiKeys.filter((k) => k === "project-a-key")).toHaveLength(1); + expect(apiKeys.filter((k) => k === "project-b-key")).toHaveLength(1); + expect(apiKeys.filter((k) => k === "project-c-key")).toHaveLength(1); + }); + + it("should isolate events between concurrent runs", async () => { + const { EventBus } = await import("../../events/event-bus"); + + // Track events per API key + const eventsByApiKey = new Map(); + + vi.mocked(EventBus).mockImplementation(function (this: unknown, config: { endpoint: string; apiKey: string }) { + const events: ScenarioEvent[] = []; + eventsByApiKey.set(config.apiKey, events); + + return { + config, + listen: vi.fn(), + subscribeTo: vi.fn().mockImplementation((events$) => { + const subscription = events$.subscribe((event: ScenarioEvent) => { + events.push(event); + }); + return subscription; + }), + drain: vi.fn().mockResolvedValue(undefined), + }; + }); + + const createScenarioConfig = (name: string) => ({ + name, + description: `Scenario ${name}`, + agents: [new TestAgent(), new TestJudgeAgent()], + script: [ + async (_state: unknown, executor: { succeed: (msg: string) => Promise }) => { + await executor.succeed(`Success from ${name}`); + }, + ], + }); + + // Run scenarios concurrently + await Promise.all([ + run(createScenarioConfig("Scenario-A"), { + langwatch: { endpoint: "https://api.example.com", apiKey: "key-a" }, + }), + run(createScenarioConfig("Scenario-B"), { + langwatch: { endpoint: "https://api.example.com", apiKey: "key-b" }, + }), + ]); + + // Verify each API key received only its own events + const eventsA = eventsByApiKey.get("key-a") ?? []; + const eventsB = eventsByApiKey.get("key-b") ?? []; + + // Each should have RUN_STARTED and RUN_FINISHED at minimum + expect(eventsA.length).toBeGreaterThanOrEqual(2); + expect(eventsB.length).toBeGreaterThanOrEqual(2); + + // Verify scenario names are correct (no cross-contamination) + const runStartedA = eventsA.find((e) => e.type === "SCENARIO_RUN_STARTED"); + const runStartedB = eventsB.find((e) => e.type === "SCENARIO_RUN_STARTED"); + + expect(runStartedA?.metadata?.name).toBe("Scenario-A"); + expect(runStartedB?.metadata?.name).toBe("Scenario-B"); + }); + }); +}); diff --git a/javascript/src/runner/run.ts b/javascript/src/runner/run.ts index 542a7285..71df178e 100644 --- a/javascript/src/runner/run.ts +++ b/javascript/src/runner/run.ts @@ -4,6 +4,21 @@ * This file contains the core `run` function that orchestrates the execution * of scenario tests, managing the interaction between user simulators, agents under test, * and judge agents to determine test success or failure. + * + * ## Concurrency Model + * + * Each call to `run()` creates its own isolated EventBus instance with its own + * configuration. This design enables safe concurrent execution of multiple scenarios, + * even when they have different LangWatch configurations (e.g., different API keys + * for different projects). + * + * Key properties: + * - **Isolation**: Events from one run never leak to another run's EventBus + * - **No shared state**: No global singletons or process-wide environment mutation + * - **Thread-safe**: Multiple `run()` calls can execute in parallel via Promise.all() + * + * This replaces an earlier design that used environment variables and required + * a mutex to prevent concurrent runs from interfering with each other. */ import { AssistantContent, ToolContent, CoreMessage } from "ai"; import { Subscription } from "rxjs"; @@ -17,7 +32,30 @@ import { import { EventBus } from "../events/event-bus"; import { ScenarioExecution } from "../execution"; import { proceed } from "../script"; -import { generateThreadId } from "../utils/ids"; +import { generateThreadId, getBatchRunId } from "../utils/ids"; +import { Logger } from "../utils/logger"; + +const logger = Logger.create("scenario.run"); + +/** + * Configuration for LangWatch event reporting. + */ +export interface LangwatchConfig { + /** The endpoint URL to send events to */ + endpoint: string; + /** The API key for authentication */ + apiKey: string; +} + +/** + * Options for running a scenario. + */ +export interface RunOptions { + /** LangWatch configuration for event reporting. Overrides environment variables. */ + langwatch?: LangwatchConfig; + /** Batch run ID for grouping scenario runs. Overrides SCENARIO_BATCH_RUN_ID env var. */ + batchRunId?: string; +} /** * High-level interface for running a scenario test. @@ -68,7 +106,7 @@ import { generateThreadId } from "../utils/ids"; * main(); * ``` */ -export async function run(cfg: ScenarioConfig): Promise { +export async function run(cfg: ScenarioConfig, options?: RunOptions): Promise { if (!cfg.name) { throw new Error("Scenario name is required"); } @@ -96,16 +134,30 @@ export async function run(cfg: ScenarioConfig): Promise { } const steps = cfg.script || [proceed()]; - const execution = new ScenarioExecution(cfg, steps); + + // Resolve batchRunId: prefer explicit option, fallback to generated + const passedBatchRunId = options?.batchRunId; + const fallbackBatchRunId = getBatchRunId(); + const resolvedBatchRunId = passedBatchRunId ?? fallbackBatchRunId; + + logger.debug("batchRunId resolution", { + passedBatchRunId, + fallbackBatchRunId, + resolvedBatchRunId, + optionsKeys: options ? Object.keys(options) : "undefined", + }); + + const execution = new ScenarioExecution(cfg, steps, resolvedBatchRunId); let eventBus: EventBus | null = null; let subscription: Subscription | null = null; try { const envConfig = getEnv(); + // Use programmatic config if provided, otherwise fall back to env vars eventBus = new EventBus({ - endpoint: envConfig.LANGWATCH_ENDPOINT, - apiKey: envConfig.LANGWATCH_API_KEY, + endpoint: options?.langwatch?.endpoint ?? envConfig.LANGWATCH_ENDPOINT, + apiKey: options?.langwatch?.apiKey ?? envConfig.LANGWATCH_API_KEY, }); eventBus.listen(); diff --git a/package.json b/package.json new file mode 100644 index 00000000..7d809a2a --- /dev/null +++ b/package.json @@ -0,0 +1,38 @@ +{ + "name": "@langwatch/scenario", + "version": "0.4.0", + "private": true, + "scripts": { + "prepare": "cd javascript && npm install --legacy-peer-deps && npm run build" + }, + "main": "javascript/dist/index.js", + "module": "javascript/dist/index.mjs", + "types": "javascript/dist/index.d.ts", + "exports": { + ".": { + "types": "./javascript/dist/index.d.ts", + "require": "./javascript/dist/index.js", + "import": "./javascript/dist/index.mjs" + }, + "./integrations/vitest/reporter": { + "types": "./javascript/dist/integrations/vitest/reporter.d.ts", + "require": "./javascript/dist/integrations/vitest/reporter.js", + "import": "./javascript/dist/integrations/vitest/reporter.mjs" + }, + "./integrations/vitest/setup": { + "types": "./javascript/dist/integrations/vitest/setup.d.ts", + "require": "./javascript/dist/integrations/vitest/setup.js", + "import": "./javascript/dist/integrations/vitest/setup.mjs" + }, + "./integrations/vitest/setup-global": { + "types": "./javascript/dist/integrations/vitest/setup-global.d.ts", + "require": "./javascript/dist/integrations/vitest/setup-global.js", + "import": "./javascript/dist/integrations/vitest/setup-global.mjs" + }, + "./integrations/vitest/config": { + "types": "./javascript/dist/integrations/vitest/config.d.ts", + "require": "./javascript/dist/integrations/vitest/config.js", + "import": "./javascript/dist/integrations/vitest/config.mjs" + } + } +}