Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions agents-api/src/__tests__/run/agents/Agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ const {
getToolsForAgentMock,
getFunctionToolsForSubAgentMock,
buildPersistedMessageContentMock,
createDefaultConversationHistoryConfigMock,
getFormattedConversationHistoryMock,
getConversationHistoryWithCompressionMock,
} = vi.hoisted(() => {
const getCredentialReferenceMock = vi.fn(() => vi.fn().mockResolvedValue(null));
const getContextConfigByIdMock = vi.fn(() => vi.fn().mockResolvedValue(null));
Expand All @@ -159,6 +162,17 @@ const {
);
const getFunctionToolsForSubAgentMock = vi.fn().mockResolvedValue([]);
const buildPersistedMessageContentMock = vi.fn();
const createDefaultConversationHistoryConfigMock = vi.fn().mockReturnValue({
mode: 'full',
limit: 50,
includeInternal: true,
messageTypes: ['chat'],
maxOutputTokens: 4000,
});
const getFormattedConversationHistoryMock = vi
.fn()
.mockResolvedValue('Mock conversation history');
const getConversationHistoryWithCompressionMock = vi.fn().mockResolvedValue([]);

return {
getCredentialReferenceMock,
Expand All @@ -170,6 +184,9 @@ const {
getToolsForAgentMock,
getFunctionToolsForSubAgentMock,
buildPersistedMessageContentMock,
createDefaultConversationHistoryConfigMock,
getFormattedConversationHistoryMock,
getConversationHistoryWithCompressionMock,
};
});

Expand Down Expand Up @@ -229,9 +246,9 @@ vi.mock('../../../domains/run/data/conversations', async (importOriginal) => {
const actual = (await importOriginal()) as any;
return {
...actual,
getConversationHistoryWithCompression: vi
.fn()
.mockResolvedValue('Mock conversation history as string'),
createDefaultConversationHistoryConfig: createDefaultConversationHistoryConfigMock,
getFormattedConversationHistory: getFormattedConversationHistoryMock,
getConversationHistoryWithCompression: getConversationHistoryWithCompressionMock,
};
});

Expand Down Expand Up @@ -362,20 +379,15 @@ vi.mock('../../../domains/run/agents/SystemPromptBuilder.js', () => ({
})),
}));

vi.mock('../../../domains/run/data/conversations.js', () => ({
createDefaultConversationHistoryConfig: vi.fn().mockReturnValue({
mode: 'full',
limit: 50,
includeInternal: true,
messageTypes: ['chat'],
maxOutputTokens: 4000,
}),
getFormattedConversationHistory: vi.fn().mockResolvedValue('Mock conversation history'),
getConversationScopedArtifacts: vi.fn().mockResolvedValue([]),
getConversationHistoryWithCompression: vi
.fn()
.mockResolvedValue('Mock conversation history as string'),
}));
vi.mock('../../../domains/run/data/conversations.js', async (importOriginal) => {
const actual = (await importOriginal()) as any;
return {
...actual,
createDefaultConversationHistoryConfig: createDefaultConversationHistoryConfigMock,
getFormattedConversationHistory: getFormattedConversationHistoryMock,
getConversationHistoryWithCompression: getConversationHistoryWithCompressionMock,
};
});

// Import the mocked module - these will automatically be mocked
import {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,15 @@ describe('getConversationHistoryWithCompression — artifact replacement', () =>
);

const result = await getConversationHistoryWithCompression(baseParams);

expect(result).toContain('Artifact: "Google Doc"');
expect(result).toContain('id: art-1');
expect(result).toContain('args:');
expect(result).toContain('description: Fetched document content');
expect(result).toContain('summary:');
expect(result).not.toContain(rawContent);
const toolResult = result.find((msg) => msg.messageType === 'tool-result');
const toolResultText = toolResult?.content?.text ?? '';

expect(toolResultText).toContain('Artifact: "Google Doc"');
expect(toolResultText).toContain('id: art-1');
expect(toolResultText).toContain('args:');
expect(toolResultText).toContain('description: Fetched document content');
expect(toolResultText).toContain('summary:');
expect(toolResultText).not.toContain(rawContent);
});

it('batches toolCallId lookups in a single getLedgerArtifacts call', async () => {
Expand Down Expand Up @@ -133,8 +135,10 @@ describe('getConversationHistoryWithCompression — artifact replacement', () =>
mockGetLedgerArtifacts.mockReturnValue(vi.fn().mockResolvedValue([]));

const result = await getConversationHistoryWithCompression(baseParams);
const toolResult = result.find((msg) => msg.messageType === 'tool-result');
const toolResultText = toolResult?.content?.text ?? '';

expect(result).toContain(content);
expect(result).not.toContain('Artifact:');
expect(toolResultText).toContain(content);
expect(toolResultText).not.toContain('Artifact:');
});
});
3 changes: 3 additions & 0 deletions agents-api/src/domains/run/agents/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type { ToolSet } from 'ai';
import { ArtifactReferenceSchema } from '../artifacts/artifact-component-schema';
import { ContextResolver } from '../context';
import { createDefaultConversationHistoryConfig } from '../data/conversations';
import { createBlobToDataUrlHydrator } from '../services/blob-storage';
import type { StreamHelper } from '../stream/stream-helpers';
import {
type AgentConfig,
Expand Down Expand Up @@ -110,6 +111,8 @@ export class Agent {
taskDenialRedirects: [],
};

ctx.hydrateBlobToDataUrl = createBlobToDataUrlHydrator();

ctx.mcpManager = new AgentMcpManager(
processedConfig,
executionContext,
Expand Down
3 changes: 3 additions & 0 deletions agents-api/src/domains/run/agents/agent-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import type {
import type { FinishReason, StepResult, ToolSet } from 'ai';
import type { MidGenerationCompressor } from '../compression/MidGenerationCompressor';
import type { ContextResolver } from '../context';
import type { HydrateBlobToDataUrl } from '../services/blob-storage';
import type { StreamHelper } from '../stream/stream-helpers';
import type { ImageDetail } from '../types/chat';
import type { SandboxConfig } from '../types/executionContext';
Expand Down Expand Up @@ -235,4 +236,6 @@ export interface AgentRunContext {
currentCompressor: MidGenerationCompressor | null;
functionToolRelationshipIdByName: Map<string, string>;
taskDenialRedirects: Array<{ toolName: string; toolCallId: string; reason: string }>;
/** When set, conversation history image parts with blob URIs are hydrated to data URLs (e.g. when the model supports images). */
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably dont need the comment here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — the comment restates what the type name already conveys. Safe to remove.

Pullfrog  | View workflow run | Using Claude Code | Triggered by Pullfrogpullfrog.com𝕏

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also where is the check for models supporting images?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code, there isn't one — ctx.hydrateBlobToDataUrl is set unconditionally in Agent.ts:114 via createBlobToDataUrlHydrator(), so hydration runs for every agent regardless of whether the model supports images. Today that's probably fine if all models in use do support images, but if a text-only model is ever configured, it would receive data-URL image parts it can't process. Worth gating on a model capability check (or at least a config flag) before this ships more broadly.

Pullfrog  | View workflow run | Using Claude Code | Triggered by Pullfrogpullfrog.com𝕏

hydrateBlobToDataUrl?: HydrateBlobToDataUrl;
}
155 changes: 136 additions & 19 deletions agents-api/src/domains/run/agents/generation/conversation-history.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import type { FilePart } from '@inkeep/agents-core';
import type { FilePart, MessageSelect } from '@inkeep/agents-core';
import type { ModelMessage } from 'ai';
import {
createDefaultConversationHistoryConfig,
formatMessagesAsConversationHistory,
getConversationHistoryWithCompression,
} from '../../data/conversations';
import type { HydrateBlobToDataUrl } from '../../services/blob-storage';
import { isBlobUri } from '../../services/blob-storage';
import {
type ContextBreakdown,
calculateBreakdownTotal,
Expand All @@ -11,15 +15,69 @@ import {
import type { AgentRunContext, AiSdkContentPart } from '../agent-types';
import { getPrimaryModel, getSummarizerModel } from './model-config';

export async function buildConversationHistory(
ctx: AgentRunContext,
contextId: string,
taskId: string,
userMessage: string,
streamRequestId: string | undefined,
initialContextBreakdown: ContextBreakdown
): Promise<{ conversationHistory: string; contextBreakdown: ContextBreakdown }> {
let conversationHistory = '';
export async function hydrateConversationHistoryBlobParts(
messages: MessageSelect[],
hydrate: HydrateBlobToDataUrl
): Promise<{ hydrated: MessageSelect[]; nonHydrated: MessageSelect[] }> {
const hydratedResults = await Promise.all(
messages.map(async (msg): Promise<{ message: MessageSelect; hydrated: boolean }> => {
const content = msg.content;
if (!content?.parts?.length) {
return { message: msg, hydrated: false };
}

let hasHydratedParts = false;
const parts = await Promise.all(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟠 MAJOR: Unbounded parallel blob downloads may exhaust connections

Issue: The nested Promise.all at lines 22-49 downloads all blob URIs concurrently without any concurrency limit. With up to 50 messages in conversation history, each potentially containing multiple image parts, this could trigger dozens of concurrent downloads.

Why: The upload path uses FILE_UPLOAD_CONCURRENCY = 3 (image-upload.ts:13) for good reason — to avoid connection pool exhaustion and rate limiting. Under load, unbounded downloads could:

  • Exhaust connection pools to S3/Vercel Blob storage
  • Trigger rate limiting (especially Vercel Blob with per-second limits)
  • Create memory pressure from buffering many large images simultaneously

Fix: Apply the same worker pool pattern used in image-upload.ts with 3-5 concurrent downloads. Consider using p-limit:

import pLimit from 'p-limit';
const limit = pLimit(3);

const parts = await Promise.all(
  content.parts.map((part) => limit(async () => {
    // existing hydration logic
  }))
);

Refs:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think bounding by a concurrency measure is valid

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Using p-limit with a concurrency of 3–5 (matching the FILE_UPLOAD_CONCURRENCY = 3 pattern in image-upload.ts) would be the cleanest approach. The inner Promise.all over parts within a single message is fine unbounded (a message typically has 1–3 parts), but the outer Promise.all over all messages should be bounded since conversation history can have dozens of messages with images.

Pullfrog  | View workflow run | Using Claude Code | Triggered by Pullfrogpullfrog.com𝕏

content.parts.map(async (part: NonNullable<MessageSelect['content']['parts']>[number]) => {
if (part.kind !== 'file' || typeof part.data !== 'string' || !isBlobUri(part.data)) {
return part;
}
const mimeType =
typeof part.metadata?.mimeType === 'string' ? part.metadata.mimeType : undefined;
const dataUrl = await hydrate(part.data, mimeType);
if (dataUrl) {
hasHydratedParts = true;
return { ...part, data: dataUrl };
}
return part;
})
);

const processedMsg = { ...msg, content: { ...content, parts } };
return { message: processedMsg, hydrated: hasHydratedParts };
})
);

const hydrated = hydratedResults
.filter((result) => result.hydrated)
.map((result) => result.message);
const nonHydrated = hydratedResults
.filter((result) => !result.hydrated)
.map((result) => result.message);

return { hydrated, nonHydrated };
}

export async function buildConversationHistory({
ctx,
contextId,
taskId,
userMessage,
streamRequestId,
initialContextBreakdown,
}: {
ctx: AgentRunContext;
contextId: string;
taskId: string;
userMessage: string;
streamRequestId: string | undefined;
initialContextBreakdown: ContextBreakdown;
}): Promise<{
conversationHistoryWithFileData: MessageSelect[];
conversationHistoryString: string;
contextBreakdown: ContextBreakdown;
}> {
let conversationHistory: MessageSelect[] = [];
const historyConfig =
ctx.config.conversationHistoryConfig ?? createDefaultConversationHistoryConfig();

Expand Down Expand Up @@ -63,7 +121,32 @@ export async function buildConversationHistory(
}
}

const conversationHistoryTokens = estimateTokens(conversationHistory);
let conversationHistoryWithFileData: MessageSelect[] = [];
if (ctx.hydrateBlobToDataUrl && conversationHistory.length > 0) {
const { hydrated, nonHydrated } = await hydrateConversationHistoryBlobParts(
conversationHistory,
ctx.hydrateBlobToDataUrl
);
conversationHistoryWithFileData = hydrated;
conversationHistory = nonHydrated;
}

const conversationHistoryString = formatMessagesAsConversationHistory(conversationHistory);

const hydratedHistoryWithFileData = conversationHistoryWithFileData
.flatMap((msg) => msg.content?.parts ?? [])
.map((part) => {
if (part.kind === 'text' && typeof part.text === 'string') {
return part.text;
}
if (part.kind === 'file' && typeof part.data === 'string') {
return part.data;
}
return '';
})
.join('\n');
const conversationHistoryTokens =
estimateTokens(conversationHistoryString) + estimateTokens(hydratedHistoryWithFileData);
const updatedContextBreakdown: ContextBreakdown = {
components: {
...initialContextBreakdown.components,
Expand All @@ -74,22 +157,56 @@ export async function buildConversationHistory(

calculateBreakdownTotal(updatedContextBreakdown);

return { conversationHistory, contextBreakdown: updatedContextBreakdown };
return {
conversationHistoryWithFileData,
conversationHistoryString,
contextBreakdown: updatedContextBreakdown,
};
}

export function buildInitialMessages(
systemPrompt: string,
conversationHistory: string,
userMessage: string,
imageParts?: FilePart[]
): any[] {
const messages: any[] = [];
export function buildInitialMessages({
systemPrompt,
conversationHistory,
userMessage,
imageParts,
conversationHistoryWithFileData,
}: {
systemPrompt: string;
conversationHistory: string;
userMessage: string;
imageParts?: FilePart[];
conversationHistoryWithFileData?: MessageSelect[];
}): ModelMessage[] {
const messages: ModelMessage[] = [];
messages.push({ role: 'system', content: systemPrompt });

if (conversationHistory.trim() !== '') {
messages.push({ role: 'user', content: conversationHistory });
}

if (conversationHistoryWithFileData?.length) {
conversationHistoryWithFileData.forEach((msg) => {
const content = (msg.content?.parts ?? []).reduce<AiSdkContentPart[]>((acc, part) => {
if (part.kind === 'text' && typeof part.text === 'string') {
acc.push({ type: 'text', text: part.text });
return acc;
}

if (part.kind === 'file' && typeof part.data === 'string') {
// Temporarily hard code to image. Must update when we add other file types.
acc.push({ type: 'image', image: part.data });
return acc;
}
Comment on lines +195 to +199
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Minor: Hardcoded assumption that all file parts are images

Issue: The comment acknowledges this ("Temporarily hard code to image. Must update when we add other file types."), but the type system allows any kind: 'file' part to reach this code path, including PDFs or other binary data.

Why: If a non-image file part is stored in conversation history, it will be sent to the model as an image, which will likely cause API errors or undefined behavior.

Fix: Add a type guard for image MIME types:

Suggested change
if (part.kind === 'file' && typeof part.data === 'string') {
// Temporarily hard code to image. Must update when we add other file types.
acc.push({ type: 'image', image: part.data });
return acc;
}
if (part.kind === 'file' && typeof part.data === 'string') {
const mimeType = part.metadata?.mimeType;
if (typeof mimeType === 'string' && mimeType.startsWith('image/')) {
acc.push({ type: 'image', image: part.data });
}
return acc;
}

Refs:

  • PR title — Feature is specifically for "image context parts"


return acc;
}, []);

if (content?.length && content.length > 0) {
messages.push({ role: 'user', content });
}
});
Comment on lines +187 to +207
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Text context is dropped. Hydrated messages only extract file parts — all text parts from the same message are discarded via flatMap. The model will receive image content with no accompanying text from that original message (e.g. a user's description of the image). Consider also including text parts so the model can associate each image with its context.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mike-inkeep Is this true?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was true in the original code but has been fixed in the "Review feedback" commit. Looking at the current buildInitialMessages (line ~189), the reduce now handles both text and file parts — text parts are included as { type: 'text', text: part.text } alongside the image parts. So text context is preserved in the hydrated messages now.

Pullfrog  | View workflow run | Using Claude Code | Triggered by Pullfrogpullfrog.com𝕏

}

const userContent = buildUserMessageContent(userMessage, imageParts);
messages.push({
role: 'user',
Expand Down
26 changes: 14 additions & 12 deletions agents-api/src/domains/run/agents/generation/generate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,15 @@ export async function runGenerate(
contextBreakdown: initialContextBreakdown,
} = await loadToolsAndPrompts(ctx, sessionId, streamRequestId || undefined, runtimeContext);

const { conversationHistory, contextBreakdown } = await buildConversationHistory(
ctx,
contextId,
taskId,
userMessage,
streamRequestId || undefined,
initialContextBreakdown
);
const { conversationHistoryString, contextBreakdown, conversationHistoryWithFileData } =
await buildConversationHistory({
ctx,
contextId,
taskId,
userMessage,
streamRequestId: streamRequestId || undefined,
initialContextBreakdown,
});

const breakdownAttributes: Record<string, number> = {};
for (const componentDef of V1_BREAKDOWN_SCHEMA) {
Expand All @@ -203,12 +204,13 @@ export async function runGenerate(
let response: ResolvedGenerationResponse;
let textResponse: string;

const messages = buildInitialMessages(
const messages = buildInitialMessages({
systemPrompt,
conversationHistory,
conversationHistory: conversationHistoryString,
conversationHistoryWithFileData,
userMessage,
imageParts
);
imageParts,
});

const { originalMessageCount, compressor } = setupCompression(
ctx,
Expand Down
Loading