diff --git a/agent/llmagent/empty_response_test.go b/agent/llmagent/empty_response_test.go new file mode 100644 index 00000000..a65e29cd --- /dev/null +++ b/agent/llmagent/empty_response_test.go @@ -0,0 +1,133 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package llmagent_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/redpanda-data/ai-sdk-go/agent" + "github.com/redpanda-data/ai-sdk-go/agent/llmagent" + "github.com/redpanda-data/ai-sdk-go/llm" + "github.com/redpanda-data/ai-sdk-go/llm/fakellm" + "github.com/redpanda-data/ai-sdk-go/store/session" +) + +// TestRun_EmptyContentNotPersisted locks in the agent-loop guard against +// session poisoning by empty assistant messages. +// +// Sibling case to PR #116. Where #116 surfaced *truncated* tool args +// instead of corrupting state, this guards against *empty* content. +// When a provider returns a Response whose Message has zero content +// blocks (max_tokens before any block was emitted, the only block was +// a partial tool_use that stream finalisation correctly dropped, or +// refusal with no text emitted), Anthropic and most other providers +// reject any subsequent replay with `messages.N.content: Field +// required` — once the empty turn is in session state, every +// following call 400s and the conversation is wedged forever. +// +// The guard sits at the same place adk-go's AppendEvent honours its +// `Event.Partial` flag: the session-store boundary, the single +// chokepoint for "what gets persisted." The MessageEvent still fires +// (observers see what happened) and the FinishReason still propagates +// (terminal-reason handling fires below); only the persistence step +// is skipped. +func TestRun_EmptyContentNotPersisted(t *testing.T) { + t.Parallel() + + model := fakellm.NewFakeModel() + model.When(fakellm.Any()). + ThenRespondWith(func(_ *llm.Request, _ *fakellm.CallContext) (*llm.Response, error) { + // max_tokens hit before any content block emitted: provider + // returns a Response with empty content + FinishReasonLength. + return &llm.Response{ + Message: llm.Message{ + Role: llm.RoleAssistant, + Content: nil, + }, + FinishReason: llm.FinishReasonLength, + }, nil + }) + + ag, err := llmagent.New("test-agent", "You are a helpful assistant", model) + require.NoError(t, err) + + sess := &session.State{ + ID: "test-session", + Messages: []llm.Message{llm.NewMessage(llm.RoleUser, llm.NewTextPart("hello"))}, + } + inv := agent.NewInvocationMetadata(sess, agent.Info{}) + + events := collectEvents(t, ag.Run(t.Context(), inv)) + + // FinishReason still propagates so callers can react. + endEvent := findInvocationEndEvent(events) + require.NotNil(t, endEvent) + assert.Equal(t, agent.FinishReasonLength, endEvent.FinishReason, + "finish reason must propagate even when content is empty") + + // MessageEvent still fires so observers can see what happened. + messageEvents := filterEvents[agent.MessageEvent](events) + assert.NotEmpty(t, messageEvents, "MessageEvent must still fire so observers see the empty response") + + // Session must NOT contain the empty assistant message — that's the + // whole point. Only the original user message stays. + require.Len(t, sess.Messages, 1, "empty assistant message must not be persisted to session") + assert.Equal(t, llm.RoleUser, sess.Messages[0].Role) + + // Sanity: every persisted message has non-empty content. This is + // the invariant Anthropic et al. require — every message in a + // replayed array must have at least one content block. + for i, m := range sess.Messages { + assert.NotEmpty(t, m.Content, "session.Messages[%d] must have non-empty content", i) + } +} + +// TestRun_NonEmptyContentStillPersisted is the negative control: +// the guard must not over-fire and skip messages that DO have content. +// If this regresses, normal turns stop being persisted and the agent +// loses all state. +func TestRun_NonEmptyContentStillPersisted(t *testing.T) { + t.Parallel() + + model := fakellm.NewFakeModel() + model.When(fakellm.Any()). + ThenRespondWith(func(_ *llm.Request, _ *fakellm.CallContext) (*llm.Response, error) { + return &llm.Response{ + Message: llm.Message{ + Role: llm.RoleAssistant, + Content: []*llm.Part{llm.NewTextPart("hi back")}, + }, + FinishReason: llm.FinishReasonStop, + }, nil + }) + + ag, err := llmagent.New("test-agent", "You are a helpful assistant", model) + require.NoError(t, err) + + sess := &session.State{ + ID: "test-session", + Messages: []llm.Message{llm.NewMessage(llm.RoleUser, llm.NewTextPart("hello"))}, + } + inv := agent.NewInvocationMetadata(sess, agent.Info{}) + + collectEvents(t, ag.Run(t.Context(), inv)) + + require.Len(t, sess.Messages, 2, "non-empty assistant message must be persisted") + assert.Equal(t, llm.RoleAssistant, sess.Messages[1].Role) + assert.NotEmpty(t, sess.Messages[1].Content) +} diff --git a/agent/llmagent/llmagent.go b/agent/llmagent/llmagent.go index 0767ed1e..11ae90d7 100644 --- a/agent/llmagent/llmagent.go +++ b/agent/llmagent/llmagent.go @@ -269,8 +269,28 @@ func (a *LLMAgent) executeSingleTurn( // Update usage tracking agent.AddUsage(inv, resp.Usage) - // Add assistant message to session (single source of truth) - sess.Messages = append(sess.Messages, resp.Message) + // Add assistant message to session (single source of truth). + // + // Skip persistence when Content is empty. Sources we have observed: + // - max_tokens hit before any content block was emitted + // - the only content block was a partial tool_use that stream + // finalisation correctly dropped (PR #116) + // - refusal / safety filter with no text emitted + // + // In all of these the FinishReason carries the truth (Length, + // ContentFilter, etc.) and the terminal-reason handling below + // surfaces it. The MessageEvent below still fires so observers + // see what happened. Persisting the empty Message would poison + // every subsequent replay — Anthropic and most other providers + // reject `messages.N.content` arrays with `Field required` and + // the conversation is permanently wedged. + // + // Same shape as adk-go's `AppendEvent` check on `Event.Partial`: + // the session-store boundary is the right place to drop responses + // that carry no usable content. + if len(resp.Message.Content) > 0 { + sess.Messages = append(sess.Messages, resp.Message) + } // Emit message event if !yield(agent.MessageEvent{ diff --git a/docs/empty-content-persistence.md b/docs/empty-content-persistence.md new file mode 100644 index 00000000..b0e3c0af --- /dev/null +++ b/docs/empty-content-persistence.md @@ -0,0 +1,139 @@ +# Empty-content responses: open design question + +Status: parked. PR #123 ships the minimal fix — agent-loop guard at the +session-store boundary. The deeper question of where the invariant +should live is not resolved. + +## The bug + +After PR #116 landed (drop partial `tool_use` blocks at stream +finalisation), an unrelated wedge surfaced in production: agent +sessions started failing every call with + +``` +messages.15.content: Field required +``` + +from Anthropic. The cause: a previous turn finalised with `Content = +[]` (max_tokens hit before any non-partial block was emitted, or the +only block was a partial tool_use that #116 correctly dropped, or a +refusal with no text). The agent loop's `sess.Messages = append(...)` +ran *before* the FinishReason terminal-handling check, so the empty +`Message` landed in session state. Every subsequent replay then sent +that empty array to Anthropic, which 400'd. Pod restarts didn't help — +session was the persistent thing. + +## What we shipped (PR #123) + +One-line guard at `agent/llmagent/llmagent.go:273`: + +```go +if len(resp.Message.Content) > 0 { + sess.Messages = append(sess.Messages, resp.Message) +} +``` + +The MessageEvent still fires; the FinishReason still propagates; the +terminal-reason handling still terminates the loop cleanly. Only +persistence is skipped. + +## What we considered and rejected + +### A. Provider returns error on empty content + +Move the check into each `response_mapper`: if `len(content) == 0`, +return `ErrResponseMapping`. The agent loop's existing terminal-error +path then fires (`generate()` returns err, loop exits cleanly). + +Rejected because it swallows the FinishReason signal. `Length + empty` +is a real, valid state (the model ran out of budget before producing +anything) — surfacing it as a generic mapping error loses the +information the caller needs to decide whether to retry with higher +`max_tokens`. Same for `ContentFilter + empty` (refusal). The agent +loop's existing handling for these reasons is correct and should keep +firing. + +### B. Provider error only on `Stop + empty` + +Conditional version of (A): only error when the stop reason is `Stop` +(end_turn / clean stop) with empty content, since *that* is the +genuinely anomalous case. Length and ContentFilter with empty content +flow through normally. + +Rejected as speculative paranoia: `Stop + empty` is theoretical and +hasn't been observed in the wild. Adding code for a phantom case adds +complexity without value. If it ever happens we can revisit. + +### C. Add a `Partial bool` field to `llm.Response` + +Mirror adk-go's `Event.Partial` flag. Providers set it when the +response is a non-result; the agent loop respects the flag at the +persistence boundary. + +Rejected as redundant. adk-go needs Partial because they expose +streaming chunks at the API surface — they have to distinguish +"chunk-with-content" from "complete." This SDK's agent loop sees only +the final aggregated `Response`, so `Partial == (len(Content) == 0)` +in our architecture. The flag adds ceremony without distinguishing +power. + +### D. Key persistence on FinishReason instead of content length + +"Skip persistence when FinishReason == Length." + +Rejected because Length with non-empty content is valid and worth +persisting — e.g., the model completed 3 of 4 parallel tool calls +before hitting `max_tokens` (PR #116's central scenario). The caller +wants those 3 completed tool_use blocks to inspect. Length alone +isn't synonymous with "unusable result"; `len(Content) == 0` is. + +## The open question + +The session-store boundary is the *right place* to honour "non-result +responses don't get persisted." That part is settled. What's not +settled: + +1. **Should providers also be involved?** Right now, the invariant is + enforced only in `llmagent.go`. A non-llmagent caller that drives + `model.Generate()` directly and persists the result has to know to + apply the same guard. That's not great encapsulation. Options: + + - Document the invariant: "callers must skip persistence when + `len(resp.Message.Content) == 0`." Cheapest. Relies on docs. + - Add a helper: `resp.IsPersistable() bool` returning + `len(Content) > 0`. Self-documenting at call sites. + - Move the guard into the session store itself (any + `sess.Append(msg)` helper rejects empty Content). Requires + introducing a session abstraction the SDK doesn't currently + own at this layer. + +2. **Should we have telemetry?** Right now empty-content responses + silently disappear into the void. A counter/log would tell us how + often this fires and whether some provider/scenario is leaking + them at an unexpected rate. Cheap to add, useful for spotting + regressions. + +3. **Is there a class of "incomplete" responses that don't have + `len(Content) == 0` but still shouldn't be persisted?** PR #116's + reproducer (truncated tool args yielding `unexpected end of JSON + input`) was such a case before the fix. Are there other shapes we + haven't seen yet? Hard to know without more production exposure. + +## Move on + +For now: PR #123 fixes the production bug. The architectural +improvements above are not blocking. Revisit when: + +- Telemetry shows empty-content responses are happening at a + non-trivial rate, or +- A non-llmagent caller hits the same wedge, or +- A new "incomplete" response shape surfaces that the simple + `len(Content) == 0` check doesn't catch. + +## References + +- PR #116 — surface truncation instead of corrupting state when + tool_use is cut off (the sibling fix) +- PR #123 — the agent-loop guard shipped here +- adk-go `session/inmemory.go::AppendEvent` — prior art for + guarding the persistence boundary on a "not-final" signal diff --git a/llm/part.go b/llm/part.go index db4cf29c..2c9ebb56 100644 --- a/llm/part.go +++ b/llm/part.go @@ -167,6 +167,29 @@ type ToolRequest struct { Arguments json.RawMessage `json:"arguments"` } +// ArgumentsAsObject decodes Arguments into a JSON object for provider APIs +// that require structured input (Anthropic tool_use.input, Bedrock tool_use +// input, Gemini function_call.args). Empty bytes decode to an empty object, +// which matches what providers actually send for no-arg tools. Invalid JSON +// also decodes to an empty object rather than erroring: corrupt arguments +// can end up in session state when a streaming turn is cut short +// mid-accumulation (e.g. stop_reason=max_tokens during input_json_delta), +// and without this fallback every replay of that session fails message +// mapping with "unexpected end of JSON input". The paired tool_result +// already carries the original parse error so the model has context. +func (t *ToolRequest) ArgumentsAsObject() map[string]any { + if len(t.Arguments) == 0 { + return map[string]any{} + } + + var input map[string]any + if err := json.Unmarshal(t.Arguments, &input); err != nil { + return map[string]any{} + } + + return input +} + // ToolResponse represents the result of executing a tool. // This is sent back to the model to continue the conversation. type ToolResponse struct { diff --git a/llm/part_test.go b/llm/part_test.go new file mode 100644 index 00000000..b98aa1ab --- /dev/null +++ b/llm/part_test.go @@ -0,0 +1,50 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package llm_test + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/redpanda-data/ai-sdk-go/llm" +) + +func TestToolRequest_ArgumentsAsObject(t *testing.T) { + t.Parallel() + + cases := []struct { + name string + in json.RawMessage + want map[string]any + }{ + {"nil", nil, map[string]any{}}, + {"empty", json.RawMessage(""), map[string]any{}}, + {"truncated", json.RawMessage(`{"query":`), map[string]any{}}, + {"not an object", json.RawMessage(`"oops"`), map[string]any{}}, + {"valid empty", json.RawMessage(`{}`), map[string]any{}}, + {"valid", json.RawMessage(`{"query":"SELECT 1"}`), map[string]any{"query": "SELECT 1"}}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + tr := &llm.ToolRequest{Arguments: tc.in} + assert.Equal(t, tc.want, tr.ArgumentsAsObject()) + }) + } +} diff --git a/providers/anthropic/request_mapper_test.go b/providers/anthropic/request_mapper_test.go new file mode 100644 index 00000000..066fb87f --- /dev/null +++ b/providers/anthropic/request_mapper_test.go @@ -0,0 +1,140 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package anthropic + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/redpanda-data/ai-sdk-go/llm" +) + +// TestRequestMapper_WedgedSessionReplay reproduces a session-replay wedge. +// +// When a streaming turn is cut off mid-tool-call (stop_reason=max_tokens +// during input_json_delta accumulation), the partial tool arguments get +// persisted to session state as truncated JSON (e.g. `{"query":`). The agent +// notices at tool-execution time and records an error tool_result, so the +// turn itself completes, but the broken assistant tool_use block stays in +// history. +// +// Every subsequent turn then rebuilds the full conversation and dies in +// mapAssistantMessage with "unexpected end of JSON input", permanently +// wedging the session. +func TestRequestMapper_WedgedSessionReplay(t *testing.T) { + t.Parallel() + + mapper := NewRequestMapper(&Config{ + ModelName: "claude-opus-4-5-20250929", + MaxTokens: 4096, + }) + + // Mirrors the shape of the poisoned session at the point of replay: + // user prompt -> assistant tool_use with truncated args -> user tool_result + // carrying the original parse error -> user text asking to try again. + req := &llm.Request{ + Messages: []llm.Message{ + { + Role: llm.RoleUser, + Content: []*llm.Part{llm.NewTextPart("rerun the analysis")}, + }, + { + Role: llm.RoleAssistant, + Content: []*llm.Part{ + llm.NewToolRequestPart(&llm.ToolRequest{ + ID: "toolu_truncated", + Name: "db_query", + Arguments: json.RawMessage(`{"query":`), // truncated mid-stream + }), + }, + }, + { + Role: llm.RoleUser, + Content: []*llm.Part{ + llm.NewToolResponsePart(&llm.ToolResponse{ + ID: "toolu_truncated", + Name: "db_query", + Error: `tool "db_query" arguments must be a JSON object: unexpected end of JSON input`, + }), + }, + }, + { + Role: llm.RoleUser, + Content: []*llm.Part{llm.NewTextPart("run the analysis again")}, + }, + }, + } + + apiReq, err := mapper.ToProvider(req) + require.NoError(t, err, "wedged session must be mappable so the conversation can recover") + require.Len(t, apiReq.Messages, 4) + + // The truncated tool_use survives to Anthropic as a valid JSON object. + // An empty object is acceptable: the paired tool_result carries the real + // parse error, and the model has enough context to retry. + assistant := apiReq.Messages[1] + require.Len(t, assistant.Content, 1) + + toolUse := assistant.Content[0].OfToolUse + require.NotNil(t, toolUse, "assistant tool_use block must be emitted") + assert.Equal(t, "toolu_truncated", toolUse.ID) + + input, ok := toolUse.Input.(map[string]any) + require.True(t, ok, "tool_use input must be a JSON object, got %T", toolUse.Input) + assert.Empty(t, input, "truncated arguments should collapse to an empty object") +} + +// TestRequestMapper_EmptyToolArguments covers the no-arg tool call case: +// when ToolRequest.Arguments is empty bytes (the wire form for tools with +// no parameters, also the value FinalizeToolArgs coerces empty streaming +// accumulations to), the mapper must emit a valid empty object rather than +// failing message mapping. +func TestRequestMapper_EmptyToolArguments(t *testing.T) { + t.Parallel() + + mapper := NewRequestMapper(&Config{ + ModelName: "claude-opus-4-5-20250929", + MaxTokens: 4096, + }) + + req := &llm.Request{ + Messages: []llm.Message{ + { + Role: llm.RoleAssistant, + Content: []*llm.Part{ + llm.NewToolRequestPart(&llm.ToolRequest{ + ID: "toolu_noargs", + Name: "list_users", + Arguments: nil, + }), + }, + }, + }, + } + + apiReq, err := mapper.ToProvider(req) + require.NoError(t, err) + require.Len(t, apiReq.Messages, 1) + + toolUse := apiReq.Messages[0].Content[0].OfToolUse + require.NotNil(t, toolUse) + + input, ok := toolUse.Input.(map[string]any) + require.True(t, ok) + assert.Empty(t, input) +} diff --git a/providers/bedrock/request_mapper.go b/providers/bedrock/request_mapper.go index a0aa9347..6ec878b7 100644 --- a/providers/bedrock/request_mapper.go +++ b/providers/bedrock/request_mapper.go @@ -256,17 +256,11 @@ func (rm *RequestMapper) mapAssistantMessage(msg llm.Message) (types.Message, er return apiMsg, errors.New("tool request part has nil ToolRequest") } - // Parse arguments to a generic map for document.Interface - var input map[string]any - if err := json.Unmarshal(part.ToolRequest.Arguments, &input); err != nil { - return apiMsg, fmt.Errorf("failed to parse tool arguments: %w", err) - } - apiMsg.Content = append(apiMsg.Content, &types.ContentBlockMemberToolUse{ Value: types.ToolUseBlock{ ToolUseId: aws.String(part.ToolRequest.ID), Name: aws.String(part.ToolRequest.Name), - Input: document.NewLazyDocument(input), + Input: document.NewLazyDocument(part.ToolRequest.ArgumentsAsObject()), }, }) diff --git a/providers/google/request_mapper.go b/providers/google/request_mapper.go index 8415ff91..fccb2669 100644 --- a/providers/google/request_mapper.go +++ b/providers/google/request_mapper.go @@ -206,15 +206,9 @@ func (rm *RequestMapper) mapParts(parts []*llm.Part) ([]*genai.Part, error) { return nil, errors.New("tool request part has nil ToolRequest") } - // Parse arguments as map for function call - var args map[string]any - if err := json.Unmarshal(part.ToolRequest.Arguments, &args); err != nil { - return nil, fmt.Errorf("failed to parse tool arguments: %w", err) - } - geminiPart := genai.NewPartFromFunctionCall( part.ToolRequest.Name, - args, + part.ToolRequest.ArgumentsAsObject(), ) // Restore thought signature preserved from previous response (required for Gemini 3 Pro)