Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 31 additions & 27 deletions backend/packages/harness/deerflow/subagents/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from langchain.agents import create_agent
from langchain.tools import BaseTool
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.messages import AIMessage, HumanMessage
from langchain_core.runnables import RunnableConfig

from deerflow.agents.thread_state import SandboxState, ThreadDataState, ThreadState
Expand Down Expand Up @@ -269,7 +269,7 @@ def __init__(

logger.info(f"[trace={self.trace_id}] SubagentExecutor initialized: {config.name} with {len(self.tools)} tools")

def _create_agent(self):
def _create_agent(self, system_prompt: str | None = None):
"""Create the agent instance."""
app_config = self.app_config or get_app_config()
if self.model_name is None:
Expand All @@ -285,27 +285,24 @@ def _create_agent(self):
model=model,
tools=self.tools,
middleware=middlewares,
system_prompt=self.config.system_prompt,
system_prompt=self.config.system_prompt if system_prompt is None else system_prompt,
state_schema=ThreadState,
)

async def _load_skill_messages(self) -> list[SystemMessage]:
"""Load skill content as conversation items based on config.skills.
async def _load_skill_prompt(self) -> str:
"""Load skill content into a single prompt block based on config.skills.

Aligned with Codex's pattern: each subagent loads its own skills
per-session and injects them as conversation items (developer messages),
not as system prompt text. The config.skills whitelist controls which
skills are loaded:
The config.skills whitelist controls which skills are loaded:
- None: load all enabled skills
- []: no skills
- ["skill-a", "skill-b"]: only these skills

Returns:
List of SystemMessages containing skill content.
System-prompt text containing preloaded skill content.
"""
if self.config.skills is not None and len(self.config.skills) == 0:
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} skills=[] — skipping skill loading")
return []
return ""

Comment on lines +292 to 306
try:
from deerflow.skills.storage import get_or_new_skill_storage
Expand All @@ -317,11 +314,11 @@ async def _load_skill_messages(self) -> list[SystemMessage]:
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} loaded {len(all_skills)} enabled skills from disk")
except Exception:
logger.warning(f"[trace={self.trace_id}] Failed to load skills for subagent {self.config.name}", exc_info=True)
return []
return ""

if not all_skills:
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} no enabled skills found")
return []
return ""

# Filter by config.skills whitelist
if self.config.skills is not None:
Expand All @@ -331,21 +328,34 @@ async def _load_skill_messages(self) -> list[SystemMessage]:
skills = all_skills

if not skills:
return []
return ""

# Read each skill's SKILL.md content and create conversation items
messages = []
# Read each skill's SKILL.md content and merge it into the single
# system prompt. Do not add SystemMessages to state: OpenAI-compatible
# providers require system messages to appear only at the beginning.
skill_blocks = []
for skill in skills:
try:
content = await asyncio.to_thread(skill.skill_file.read_text, encoding="utf-8")
content = content.strip()
if content:
messages.append(SystemMessage(content=f'<skill name="{skill.name}">\n{content}\n</skill>'))
skill_blocks.append(f'<skill name="{skill.name}">\n{content}\n</skill>')
logger.info(f"[trace={self.trace_id}] Subagent {self.config.name} loaded skill: {skill.name}")
except Exception:
logger.debug(f"[trace={self.trace_id}] Failed to read skill {skill.name}", exc_info=True)

return messages
if not skill_blocks:
return ""

joined_skill_blocks = "\n\n".join(skill_blocks)
return f"<skills>\nThe following skills are preloaded. Follow them when relevant.\n\n{joined_skill_blocks}\n</skills>"

def _build_system_prompt(self, skill_prompt: str) -> str:
"""Build the per-run subagent system prompt."""
if not skill_prompt:
return self.config.system_prompt
base_prompt = self.config.system_prompt.rstrip()
return f"{base_prompt}\n\n{skill_prompt}" if base_prompt else skill_prompt

async def _build_initial_state(self, task: str) -> dict[str, Any]:
"""Build the initial state for agent execution.
Expand All @@ -356,14 +366,7 @@ async def _build_initial_state(self, task: str) -> dict[str, Any]:
Returns:
Initial state dictionary.
"""
# Load skills as conversation items (Codex pattern)
skill_messages = await self._load_skill_messages()

messages: list = []
# Skill content injected as developer/system messages before the task
messages.extend(skill_messages)
# Then the actual task
messages.append(HumanMessage(content=task))
messages = [HumanMessage(content=task)]

state: dict[str, Any] = {
"messages": messages,
Expand Down Expand Up @@ -405,7 +408,8 @@ async def _aexecute(self, task: str, result_holder: SubagentResult | None = None
result.ai_messages = ai_messages

try:
agent = self._create_agent()
skill_prompt = await self._load_skill_prompt()
agent = self._create_agent(system_prompt=self._build_system_prompt(skill_prompt))
state = await self._build_initial_state(task)

# Build config with thread_id for sandbox access and recursion limit
Expand Down
5 changes: 2 additions & 3 deletions backend/packages/harness/deerflow/tools/builtins/task_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,8 @@ async def task_tool(
# Build config overrides
overrides: dict = {}

# Skills are loaded by SubagentExecutor per-session (aligned with Codex's pattern:
# each subagent loads its own skills based on config, injected as conversation items).
# No longer appended to system_prompt here.
# Skills are preloaded by SubagentExecutor per-session and merged into the
# subagent system prompt. They are not appended to the task config here.

if max_turns is not None:
overrides["max_turns"] = max_turns
Expand Down
82 changes: 78 additions & 4 deletions backend/tests/test_subagent_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def fake_create_agent(**kwargs):
assert captured["agent"]["system_prompt"] == base_config.system_prompt

@pytest.mark.anyio
async def test_load_skill_messages_uses_explicit_app_config_for_skill_storage(
async def test_load_skill_prompt_uses_explicit_app_config_for_skill_storage(
self,
classes,
base_config,
Expand Down Expand Up @@ -297,11 +297,11 @@ def fake_get_or_new_skill_storage(*, app_config=None):
thread_id="test-thread",
)

messages = await executor._load_skill_messages()
skill_prompt = await executor._load_skill_prompt()

assert captured["app_config"] is app_config
assert len(messages) == 1
assert "Use demo skill" in messages[0].content
assert '<skill name="demo-skill">' in skill_prompt
assert "Use demo skill" in skill_prompt


# -----------------------------------------------------------------------------
Expand Down Expand Up @@ -1303,3 +1303,77 @@ def test_cleanup_removes_cancelled_task(self, executor_module, classes):
executor_module.cleanup_background_task(task_id)

assert task_id not in executor_module._background_tasks


# -----------------------------------------------------------------------------
# Skill Preload Tests
# -----------------------------------------------------------------------------


class TestSkillPreload:
@pytest.mark.anyio
async def test_build_initial_state_keeps_only_task_human_message(self, classes, base_config):
"""Initial state should not inject skill content as extra system messages."""
from langchain_core.messages import HumanMessage

SubagentExecutor = classes["SubagentExecutor"]
executor = SubagentExecutor(config=base_config, tools=[], thread_id="test-thread")

state = await executor._build_initial_state("Do the task")

assert [type(message) for message in state["messages"]] == [HumanMessage]
assert [message.content for message in state["messages"]] == ["Do the task"]

@pytest.mark.anyio
async def test_preloaded_skills_are_sent_in_the_leading_system_message(self, classes, base_config, monkeypatch):
"""Preloaded skills must reach the model without creating later SystemMessages."""
Comment on lines +1308 to +1329
from langchain_core.language_models.fake_chat_models import FakeListChatModel
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.outputs import ChatGeneration, ChatResult

SubagentExecutor = classes["SubagentExecutor"]
captured_messages = []

class CapturingChatModel(FakeListChatModel):
def _generate(self, messages, stop=None, run_manager=None, **kwargs):
captured_messages.append(messages)
return ChatResult(generations=[ChatGeneration(message=AIMessage(content="done"))])

skill = SimpleNamespace(
name="demo-skill",
description="Demo workflow",
skill_file=SimpleNamespace(read_text=MagicMock(return_value="Demo skill instructions")),
)
config = classes["SubagentConfig"](
name=base_config.name,
description=base_config.description,
system_prompt=base_config.system_prompt,
skills=["demo-skill"],
max_turns=base_config.max_turns,
timeout_seconds=base_config.timeout_seconds,
)
executor = SubagentExecutor(config=config, tools=[], thread_id="test-thread")

middleware_module = ModuleType("deerflow.agents.middlewares.tool_error_handling_middleware")
middleware_module.build_subagent_runtime_middlewares = lambda **kwargs: []
monkeypatch.setitem(sys.modules, "deerflow.agents.middlewares.tool_error_handling_middleware", middleware_module)
monkeypatch.setattr("deerflow.subagents.executor.ThreadState", None)
monkeypatch.setattr("deerflow.subagents.executor.create_chat_model", lambda **kwargs: CapturingChatModel(responses=["done"]))

def fake_get_or_new_skill_storage(*, app_config=None):
return SimpleNamespace(load_skills=lambda *, enabled_only: [skill])

monkeypatch.setattr("deerflow.skills.storage.get_or_new_skill_storage", fake_get_or_new_skill_storage)

with patch.object(skill.skill_file, "read_text", return_value="Demo skill instructions"):
result = await executor._aexecute("Do the task")

assert result.result == "done"
assert captured_messages
messages = captured_messages[0]
system_indexes = [index for index, message in enumerate(messages) if isinstance(message, SystemMessage)]
assert system_indexes == [0]
assert base_config.system_prompt in messages[0].content
assert '<skill name="demo-skill">' in messages[0].content
assert "Demo skill instructions" in messages[0].content
assert [message.content for message in messages if isinstance(message, HumanMessage)] == ["Do the task"]
4 changes: 2 additions & 2 deletions backend/tests/test_task_tool_core_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ def execute_async(self, prompt, task_id=None):
assert captured["executor_kwargs"]["thread_id"] == "thread-1"
assert captured["executor_kwargs"]["parent_model"] == "ark-model"
assert captured["executor_kwargs"]["config"].max_turns == 7
# Skills are no longer appended to system_prompt; they are loaded per-session
# by SubagentExecutor and injected as conversation items (Codex pattern).
# Skills are loaded per-session by SubagentExecutor and merged into the
# runtime system prompt, not appended to the stored config here.
assert captured["executor_kwargs"]["config"].system_prompt == "Base system prompt"

get_available_tools.assert_called_once_with(model_name="ark-model", groups=None, subagent_enabled=False)
Expand Down