Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 78 additions & 14 deletions src/magentic_ui/teams/orchestrator/_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
get_orchestrator_system_message_planning,
get_orchestrator_system_message_planning_autonomous,
get_orchestrator_plan_prompt_json,
get_orchestrator_enhanced_plan_prompt_json,
get_orchestrator_plan_replan_json,
get_orchestrator_progress_ledger_prompt,
ORCHESTRATOR_SYSTEM_MESSAGE_EXECUTION,
Expand All @@ -58,6 +59,7 @@
INSTRUCTION_AGENT_FORMAT,
validate_ledger_json,
validate_plan_json,
validate_enhanced_plan_json,
)
from ._utils import is_accepted_str, extract_json_from_string
from loguru import logger as trace_logger
Expand Down Expand Up @@ -182,6 +184,8 @@ def __init__(
# Setup internal variables
self._setup_internals()

self.enhanced_plan = True

def _setup_internals(self) -> None:
"""
Setup internal variables used in orchestrator
Expand Down Expand Up @@ -252,6 +256,18 @@ def _get_task_ledger_plan_prompt(self, team: str) -> str:
team=team, additional_instructions=additional_instructions
)

def _get_task_ledger_plan_prompt_enhanced(self, team: str, user_query: str) -> str:
additional_instructions = ""
if self._config.allowed_websites is not None:
additional_instructions = (
"Only use the following websites if possible: "
+ ", ".join(self._config.allowed_websites)
)

return get_orchestrator_enhanced_plan_prompt_json().format(
team=team, additional_instructions=additional_instructions,user_query=user_query
)

def _get_task_ledger_replan_plan_prompt(
self, task: str, team: str, plan: str
) -> str:
Expand Down Expand Up @@ -315,6 +331,9 @@ def _validate_ledger_json(self, json_response: Dict[str, Any]) -> bool:

def _validate_plan_json(self, json_response: Dict[str, Any]) -> bool:
return validate_plan_json(json_response, self._config.sentinel_tasks)

def _validate_enhanced_plan_json(self, json_response: Dict[str, Any]) -> bool:
return validate_enhanced_plan_json(json_response)

async def validate_group_state(
self, messages: List[BaseChatMessage] | None
Expand Down Expand Up @@ -394,6 +413,25 @@ async def _request_next_speaker(
cancellation_token=cancellation_token,
)

async def change_format(self,response):
output = {
"terms": response.get('terms', ''),
'response': response.get('response', ''),
'task': response.get('task', {}),
'plan_summary': response.get('plan_summary', ''),
'needs_plan': response.get('needs_plan', False),
'steps': []
}
for step in response.get('steps', {}).values():
agent_name = step.get('agent_name', '')
for substep in step.get('substeps', {}).values():
output['steps'].append({
'title': substep['title'],
'details': substep['details'],
'agent_name': agent_name
})
return output

async def _get_json_response(
self,
messages: List[LLMMessage],
Expand Down Expand Up @@ -757,15 +795,28 @@ async def _orchestrate_step_planning(
await self._handle_relevant_plan_from_memory(context=context)

# create a first plan
context.append(
if self.enhanced_plan:
user_query = context[1].content
context.append(
UserMessage(
content=self._get_task_ledger_plan_prompt(self._team_description),
content=self._get_task_ledger_plan_prompt_enhanced(self._team_description,user_query),
source=self._name,
))
plan_response = await self._get_json_response(
context, self._validate_enhanced_plan_json, cancellation_token
)
)
plan_response = await self._get_json_response(
context, self._validate_plan_json, cancellation_token
)
plan_response = await self.change_format(plan_response)
else:
context.append(
UserMessage(
content=self._get_task_ledger_plan_prompt(self._team_description),
source=self._name,
)
)
plan_response = await self._get_json_response(
context, self._validate_plan_json, cancellation_token
)

if self._state.is_paused:
# let user speak next if paused
await self._request_next_speaker(
Expand Down Expand Up @@ -817,17 +868,30 @@ async def _orchestrate_step_planning(
)
if self._config.retrieve_relevant_plans == "hint":
await self._handle_relevant_plan_from_memory(context=context)
context.append(

if self.enhanced_plan:
user_query = context[1].content
context.append(
UserMessage(
content=self._get_task_ledger_plan_prompt(
self._team_description
),
content=self._get_task_ledger_plan_prompt_enhanced(self._team_description,user_query),
Copy link
Preview

Copilot AI Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space after comma in function call. Should be: self._get_task_ledger_plan_prompt_enhanced(self._team_description, user_query)

Suggested change
content=self._get_task_ledger_plan_prompt_enhanced(self._team_description,user_query),
content=self._get_task_ledger_plan_prompt_enhanced(self._team_description, user_query),

Copilot uses AI. Check for mistakes.

source=self._name,
))
plan_response = await self._get_json_response(
context, self._validate_enhanced_plan_json, cancellation_token
)
plan_response = await self.change_format(plan_response)
else:
context.append(
UserMessage(
content=self._get_task_ledger_plan_prompt(
self._team_description
),
source=self._name,
)
)
plan_response = await self._get_json_response(
context, self._validate_plan_json, cancellation_token
)
)
plan_response = await self._get_json_response(
context, self._validate_plan_json, cancellation_token
)
if self._state.is_paused:
# let user speak next if paused
await self._request_next_speaker(
Expand Down
84 changes: 84 additions & 0 deletions src/magentic_ui/teams/orchestrator/_prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,81 @@ def get_orchestrator_plan_prompt_json(sentinel_tasks_enabled: bool = False) -> s
{json_schema}
"""

def get_orchestrator_enhanced_plan_prompt_json() -> str:

return """
You have access to the following team members that can help you address the request each with unique expertise:

{team}

Remember, there is no requirement to involve all team members -- a team member's particular expertise may not be needed for this task.


{additional_instructions}

You are an expert in orchestrating multi-agent systems and designing agentic workflows. You specialize in translating user goals into structured, executable plans that involve multiple tools and agents. Your task is to generate a hierarchical plan that breaks down the user’s objective into clear steps and sub-steps, where each sub-step is a coherent unit of work that can be executed by a single agent.

You have already received the user query and the clarified answers to any ambiguities. Now, your job is to create a structured plan that can be used to orchestrate tool usage and agent behavior.

Each plan should:
- Be hierarchical: include high-level steps and their corresponding sub-steps.
- Ensure that all sub-steps under a step are logically grouped and can be executed by the same agent.
- Include a short description of the purpose of each step and sub-step.
- Be tool-aware: if a sub-step requires a specific tool (e.g., search, summarization, generation), mention it.
- Be executable: avoid vague or abstract actions. Be specific and actionable.

You must also include:
- Terms: Key concepts or domain-specific terms relevant to the task.
- Task: A concise summary of the goal and objective.
- Steps: A breakdown of the plan into steps and sub-steps.
- Title and Details for each step and sub-step.
- Agent Name: The agent responsible for executing each step or sub-step.
- Need_Plan: A boolean indicating whether a plan is needed (always true in this context).
- Response: A short natural language summary of the plan.
- Plan_Summary: A concise overview of the entire plan.

IMPORTANT NOTE:
The environment you operate in can be very dynamic and at times you might face a situation where you are unable to make progress on the task due to several reasons. In such cases, you should try a couple of times to make progress on the task, and if you are still unable to make progress, you should create a new plan that addresses the failures in trying to complete the task previously. You should also try to avoid getting stuck in a loop and do not keep retrying too many times.
Also this is to be followed by all team members. So as you design the plan, ensure to add this instruction to all team members so that they too do not waste time trying to complete the task if they are unable to make progress on it, they they should return back to you for help

INPUT:
USER_QUERY:
{user_query}

OUTPUT FORMAT:
Return a JSON object with the following structure:

{{
"terms": ["term1", "term2", ...],
"task": "summary of the goal and objective",
"needs_plan": true,
"response": "Short natural language summary of the plan",
"plan_summary": "Concise overview of the plan",
"steps": {{
"Step 1": {{
"title": "Title of the step",
"details": "Purpose and scope of this step",
"agent_name": "Agent responsible for this step",
"substeps": {{
"1.1": {{
"title": "Title of the sub-step",
"details": "What this sub-step does",
"status": "Current state in plan, recap prev steps and highlights next steps"
}},
"1.2": {{
...
}}
}}
}},
"Step 2": {{
...
}}
}}
}}

DO NOT OUTPUT ANYTHING OTHER THAN THE JSON OBJECT."""



def get_orchestrator_plan_replan_json(sentinel_tasks_enabled: bool = False) -> str:
"""Get the orchestrator replan prompt in JSON format, with optional SentinelPlanStep support."""
Expand Down Expand Up @@ -907,3 +982,12 @@ def validate_plan_json(
if "title" not in item or "details" not in item or "agent_name" not in item:
return False
return True

def validate_enhanced_plan_json(json_response: Dict[str, Any]) -> bool:
if not isinstance(json_response, dict):
return False
required_keys = ["terms","task","needs_plan","response","plan_summary","steps"]
for key in required_keys:
if key not in json_response:
return False
return True
Loading