diff --git a/backend/packages/harness/deerflow/subagents/executor.py b/backend/packages/harness/deerflow/subagents/executor.py index 2fe5c05dc0..9f26340f3c 100644 --- a/backend/packages/harness/deerflow/subagents/executor.py +++ b/backend/packages/harness/deerflow/subagents/executor.py @@ -16,7 +16,7 @@ from langchain.agents import create_agent from langchain.tools import BaseTool -from langchain_core.messages import AIMessage, HumanMessage, SystemMessage +from langchain_core.messages import AIMessage, HumanMessage from langchain_core.runnables import RunnableConfig from deerflow.agents.thread_state import SandboxState, ThreadDataState, ThreadState @@ -269,7 +269,7 @@ def __init__( logger.info(f"[trace={self.trace_id}] SubagentExecutor initialized: {config.name} with {len(self.tools)} tools") - def _create_agent(self): + def _create_agent(self, system_prompt: str | None = None): """Create the agent instance.""" app_config = self.app_config or get_app_config() if self.model_name is None: @@ -285,27 +285,24 @@ def _create_agent(self): model=model, tools=self.tools, middleware=middlewares, - system_prompt=self.config.system_prompt, + system_prompt=self.config.system_prompt if system_prompt is None else system_prompt, state_schema=ThreadState, ) - async def _load_skill_messages(self) -> list[SystemMessage]: - """Load skill content as conversation items based on config.skills. + async def _load_skill_prompt(self) -> str: + """Load skill content into a single prompt block based on config.skills. - Aligned with Codex's pattern: each subagent loads its own skills - per-session and injects them as conversation items (developer messages), - not as system prompt text. The config.skills whitelist controls which - skills are loaded: + The config.skills whitelist controls which skills are loaded: - None: load all enabled skills - []: no skills - ["skill-a", "skill-b"]: only these skills Returns: - List of SystemMessages containing skill content. + System-prompt text containing preloaded skill content. """ if self.config.skills is not None and len(self.config.skills) == 0: logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} skills=[] — skipping skill loading") - return [] + return "" try: from deerflow.skills.storage import get_or_new_skill_storage @@ -317,11 +314,11 @@ async def _load_skill_messages(self) -> list[SystemMessage]: logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} loaded {len(all_skills)} enabled skills from disk") except Exception: logger.warning(f"[trace={self.trace_id}] Failed to load skills for subagent {self.config.name}", exc_info=True) - return [] + return "" if not all_skills: logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} no enabled skills found") - return [] + return "" # Filter by config.skills whitelist if self.config.skills is not None: @@ -331,21 +328,34 @@ async def _load_skill_messages(self) -> list[SystemMessage]: skills = all_skills if not skills: - return [] + return "" - # Read each skill's SKILL.md content and create conversation items - messages = [] + # Read each skill's SKILL.md content and merge it into the single + # system prompt. Do not add SystemMessages to state: OpenAI-compatible + # providers require system messages to appear only at the beginning. + skill_blocks = [] for skill in skills: try: content = await asyncio.to_thread(skill.skill_file.read_text, encoding="utf-8") content = content.strip() if content: - messages.append(SystemMessage(content=f'\n{content}\n')) + skill_blocks.append(f'\n{content}\n') logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} loaded skill: {skill.name}") except Exception: logger.debug(f"[trace={self.trace_id}] Failed to read skill {skill.name}", exc_info=True) - return messages + if not skill_blocks: + return "" + + joined_skill_blocks = "\n\n".join(skill_blocks) + return f"\nThe following skills are preloaded. Follow them when relevant.\n\n{joined_skill_blocks}\n" + + def _build_system_prompt(self, skill_prompt: str) -> str: + """Build the per-run subagent system prompt.""" + if not skill_prompt: + return self.config.system_prompt + base_prompt = self.config.system_prompt.rstrip() + return f"{base_prompt}\n\n{skill_prompt}" if base_prompt else skill_prompt async def _build_initial_state(self, task: str) -> dict[str, Any]: """Build the initial state for agent execution. @@ -356,14 +366,7 @@ async def _build_initial_state(self, task: str) -> dict[str, Any]: Returns: Initial state dictionary. """ - # Load skills as conversation items (Codex pattern) - skill_messages = await self._load_skill_messages() - - messages: list = [] - # Skill content injected as developer/system messages before the task - messages.extend(skill_messages) - # Then the actual task - messages.append(HumanMessage(content=task)) + messages = [HumanMessage(content=task)] state: dict[str, Any] = { "messages": messages, @@ -405,7 +408,8 @@ async def _aexecute(self, task: str, result_holder: SubagentResult | None = None result.ai_messages = ai_messages try: - agent = self._create_agent() + skill_prompt = await self._load_skill_prompt() + agent = self._create_agent(system_prompt=self._build_system_prompt(skill_prompt)) state = await self._build_initial_state(task) # Build config with thread_id for sandbox access and recursion limit diff --git a/backend/packages/harness/deerflow/tools/builtins/task_tool.py b/backend/packages/harness/deerflow/tools/builtins/task_tool.py index 1328507b2a..aea9fc9b12 100644 --- a/backend/packages/harness/deerflow/tools/builtins/task_tool.py +++ b/backend/packages/harness/deerflow/tools/builtins/task_tool.py @@ -109,9 +109,8 @@ async def task_tool( # Build config overrides overrides: dict = {} - # Skills are loaded by SubagentExecutor per-session (aligned with Codex's pattern: - # each subagent loads its own skills based on config, injected as conversation items). - # No longer appended to system_prompt here. + # Skills are preloaded by SubagentExecutor per-session and merged into the + # subagent system prompt. They are not appended to the task config here. if max_turns is not None: overrides["max_turns"] = max_turns diff --git a/backend/tests/test_subagent_executor.py b/backend/tests/test_subagent_executor.py index 102ac091a4..c8f25e31e1 100644 --- a/backend/tests/test_subagent_executor.py +++ b/backend/tests/test_subagent_executor.py @@ -267,7 +267,7 @@ def fake_create_agent(**kwargs): assert captured["agent"]["system_prompt"] == base_config.system_prompt @pytest.mark.anyio - async def test_load_skill_messages_uses_explicit_app_config_for_skill_storage( + async def test_load_skill_prompt_uses_explicit_app_config_for_skill_storage( self, classes, base_config, @@ -297,11 +297,11 @@ def fake_get_or_new_skill_storage(*, app_config=None): thread_id="test-thread", ) - messages = await executor._load_skill_messages() + skill_prompt = await executor._load_skill_prompt() assert captured["app_config"] is app_config - assert len(messages) == 1 - assert "Use demo skill" in messages[0].content + assert '' in skill_prompt + assert "Use demo skill" in skill_prompt # ----------------------------------------------------------------------------- @@ -1303,3 +1303,77 @@ def test_cleanup_removes_cancelled_task(self, executor_module, classes): executor_module.cleanup_background_task(task_id) assert task_id not in executor_module._background_tasks + + +# ----------------------------------------------------------------------------- +# Skill Preload Tests +# ----------------------------------------------------------------------------- + + +class TestSkillPreload: + @pytest.mark.anyio + async def test_build_initial_state_keeps_only_task_human_message(self, classes, base_config): + """Initial state should not inject skill content as extra system messages.""" + from langchain_core.messages import HumanMessage + + SubagentExecutor = classes["SubagentExecutor"] + executor = SubagentExecutor(config=base_config, tools=[], thread_id="test-thread") + + state = await executor._build_initial_state("Do the task") + + assert [type(message) for message in state["messages"]] == [HumanMessage] + assert [message.content for message in state["messages"]] == ["Do the task"] + + @pytest.mark.anyio + async def test_preloaded_skills_are_sent_in_the_leading_system_message(self, classes, base_config, monkeypatch): + """Preloaded skills must reach the model without creating later SystemMessages.""" + from langchain_core.language_models.fake_chat_models import FakeListChatModel + from langchain_core.messages import AIMessage, HumanMessage, SystemMessage + from langchain_core.outputs import ChatGeneration, ChatResult + + SubagentExecutor = classes["SubagentExecutor"] + captured_messages = [] + + class CapturingChatModel(FakeListChatModel): + def _generate(self, messages, stop=None, run_manager=None, **kwargs): + captured_messages.append(messages) + return ChatResult(generations=[ChatGeneration(message=AIMessage(content="done"))]) + + skill = SimpleNamespace( + name="demo-skill", + description="Demo workflow", + skill_file=SimpleNamespace(read_text=MagicMock(return_value="Demo skill instructions")), + ) + config = classes["SubagentConfig"]( + name=base_config.name, + description=base_config.description, + system_prompt=base_config.system_prompt, + skills=["demo-skill"], + max_turns=base_config.max_turns, + timeout_seconds=base_config.timeout_seconds, + ) + executor = SubagentExecutor(config=config, tools=[], thread_id="test-thread") + + middleware_module = ModuleType("deerflow.agents.middlewares.tool_error_handling_middleware") + middleware_module.build_subagent_runtime_middlewares = lambda **kwargs: [] + monkeypatch.setitem(sys.modules, "deerflow.agents.middlewares.tool_error_handling_middleware", middleware_module) + monkeypatch.setattr("deerflow.subagents.executor.ThreadState", None) + monkeypatch.setattr("deerflow.subagents.executor.create_chat_model", lambda **kwargs: CapturingChatModel(responses=["done"])) + + def fake_get_or_new_skill_storage(*, app_config=None): + return SimpleNamespace(load_skills=lambda *, enabled_only: [skill]) + + monkeypatch.setattr("deerflow.skills.storage.get_or_new_skill_storage", fake_get_or_new_skill_storage) + + with patch.object(skill.skill_file, "read_text", return_value="Demo skill instructions"): + result = await executor._aexecute("Do the task") + + assert result.result == "done" + assert captured_messages + messages = captured_messages[0] + system_indexes = [index for index, message in enumerate(messages) if isinstance(message, SystemMessage)] + assert system_indexes == [0] + assert base_config.system_prompt in messages[0].content + assert '' in messages[0].content + assert "Demo skill instructions" in messages[0].content + assert [message.content for message in messages if isinstance(message, HumanMessage)] == ["Do the task"] diff --git a/backend/tests/test_task_tool_core_logic.py b/backend/tests/test_task_tool_core_logic.py index 428b7a066c..9a1039a2e9 100644 --- a/backend/tests/test_task_tool_core_logic.py +++ b/backend/tests/test_task_tool_core_logic.py @@ -230,8 +230,8 @@ def execute_async(self, prompt, task_id=None): assert captured["executor_kwargs"]["thread_id"] == "thread-1" assert captured["executor_kwargs"]["parent_model"] == "ark-model" assert captured["executor_kwargs"]["config"].max_turns == 7 - # Skills are no longer appended to system_prompt; they are loaded per-session - # by SubagentExecutor and injected as conversation items (Codex pattern). + # Skills are loaded per-session by SubagentExecutor and merged into the + # runtime system prompt, not appended to the stored config here. assert captured["executor_kwargs"]["config"].system_prompt == "Base system prompt" get_available_tools.assert_called_once_with(model_name="ark-model", groups=None, subagent_enabled=False)