diff --git a/app/agent/toolcall.py b/app/agent/toolcall.py index 65f31d988..a0efcf14b 100644 --- a/app/agent/toolcall.py +++ b/app/agent/toolcall.py @@ -129,7 +129,7 @@ async def think(self) -> bool: return False async def act(self) -> str: - """Execute tool calls and handle their results""" + """Execute tool calls and handle their results, supporting parallel execution""" if not self.tool_calls: if self.tool_choices == ToolChoice.REQUIRED: raise ValueError(TOOL_CALL_REQUIRED) @@ -137,31 +137,41 @@ async def act(self) -> str: # Return last message content if no tool calls return self.messages[-1].content or "No content or commands to execute" - results = [] - for command in self.tool_calls: - # Reset base64_image for each tool call - self._current_base64_image = None - + # Define a helper to execute a single tool and return its message + async def execute_and_format(command: ToolCall) -> Message: + # Note: _current_base64_image might be problematic in parallel + # For now, we handle it by making execute_tool return the image if any result = await self.execute_tool(command) + + # Use a local copy of the image if the tool execution set it + # This is a bit hacky due to the current class-level attribute design + current_image = self._current_base64_image + self._current_base64_image = None - if self.max_observe: - result = result[: self.max_observe] + if self.max_observe and isinstance(result, str): + if len(result) > self.max_observe: + result = result[: self.max_observe] + "\n... (result truncated)" logger.info( - f"🎯 Tool '{command.function.name}' completed its mission! Result: {result}" + f"🎯 Tool '{command.function.name}' completed its mission!" ) - # Add tool response to memory - tool_msg = Message.tool_message( + return Message.tool_message( content=result, tool_call_id=command.id, name=command.function.name, - base64_image=self._current_base64_image, + base64_image=current_image, ) - self.memory.add_message(tool_msg) - results.append(result) - return "\n\n".join(results) + # Execute tools in parallel + tasks = [execute_and_format(command) for command in self.tool_calls] + tool_messages = await asyncio.gather(*tasks) + + # Add all tool responses to memory + for msg in tool_messages: + self.memory.add_message(msg) + + return "\n\n".join([msg.content for msg in tool_messages]) async def execute_tool(self, command: ToolCall) -> str: """Execute a single tool call with robust error handling""" diff --git a/app/prompt/manus.py b/app/prompt/manus.py index 99e7e8315..a9d93d8d3 100644 --- a/app/prompt/manus.py +++ b/app/prompt/manus.py @@ -4,7 +4,11 @@ ) NEXT_STEP_PROMPT = """ -Based on user needs, proactively select the most appropriate tool or combination of tools. For complex tasks, you can break down the problem and use different tools step by step to solve it. After using each tool, clearly explain the execution results and suggest the next steps. +Based on user needs, proactively select the most appropriate tool or combination of tools. For complex tasks, you can break down the problem and use different tools step by step to solve it. + +**Optimization Tip**: You can now call multiple tools in a single step if they are independent. For example, you can perform multiple searches or read multiple files simultaneously to improve efficiency. + +After using the tools, clearly explain the execution results and suggest the next steps. If you want to stop the interaction at any point, use the `terminate` tool/function call. """ diff --git a/app/tool/web_search.py b/app/tool/web_search.py index b9b9e31ae..1bf06775a 100644 --- a/app/tool/web_search.py +++ b/app/tool/web_search.py @@ -289,6 +289,46 @@ async def execute( async def _try_all_engines( self, query: str, num_results: int, search_params: Dict[str, Any] + ) -> List[SearchResult]: + """Try all search engines in the configured order with deduplication.""" + engine_order = self._get_engine_order() + all_results = [] + seen_urls = set() + + for engine_name in engine_order: + engine = self._search_engine[engine_name] + logger.info(f"🔎 Attempting search with {engine_name.capitalize()}...") + try: + # Some engines might be synchronous, run in executor if needed + if asyncio.iscoroutinefunction(engine.perform_search): + engine_results = await engine.perform_search(query, num_results, **search_params) + else: + engine_results = await asyncio.get_event_loop().run_in_executor( + None, lambda: engine.perform_search(query, num_results, **search_params) + ) + + if engine_results: + for item in engine_results: + if item.url not in seen_urls: + all_results.append(SearchResult( + position=len(all_results) + 1, + url=item.url, + title=item.title, + description=item.description or "", + source=engine_name + )) + seen_urls.add(item.url) + + if len(all_results) >= num_results: + break + except Exception as e: + logger.warning(f"Search engine {engine_name} failed: {e}") + continue + + return all_results[:num_results] + + async def _old_try_all_engines( + self, query: str, num_results: int, search_params: Dict[str, Any] ) -> List[SearchResult]: """Try all search engines in the configured order.""" engine_order = self._get_engine_order() diff --git a/optimization_plan.md b/optimization_plan.md new file mode 100644 index 000000000..ae6e62004 --- /dev/null +++ b/optimization_plan.md @@ -0,0 +1,48 @@ +# OpenManus 优化方案 + +通过对 OpenManus 项目的搜索分析和代码审查,总结出以下主要缺点及对应的优化方案。 + +## 1. 缺点分析 + +### 1.1 核心逻辑局限性 +- **单步迭代限制**:目前 Agent 在每一步迭代中只能执行一个工具调用,必须等待结果后才能进行下一步。这在处理复杂任务(如同时搜索多个关键词、并行处理文件)时效率较低。 +- **ReAct 模式单一**:主要依赖简单的 ReAct (Reasoning + Acting) 模式,缺乏更高级的规划(Planning)和自我修正(Self-reflection)机制。 +- **上下文管理简单**:虽然有内存管理,但在长任务中,上下文长度的增加可能导致模型性能下降或超出 Token 限制。 + +### 1.2 工具能力不足 +- **浏览器工具限制**:目前的浏览器工具在处理复杂交互、识别所有交互元素方面存在局限,且缺乏对视觉信息的深度整合。 +- **搜索工具单一**:虽然支持多种搜索引擎,但在结果整合和去重方面做得不够。 + +### 1.3 工程实现问题 +- **错误处理不够鲁棒**:在工具调用失败或 LLM 返回异常格式时,Agent 容易陷入死循环或直接崩溃。 +- **缺乏任务状态持久化**:如果运行中断,无法从上次的状态恢复。 + +## 2. 优化方案 + +### 2.1 引入并行工具执行 (Parallel Tool Execution) +- **目标**:允许 Agent 在一个思考周期内生成多个工具调用,并并行执行它们。 +- **实施**:修改 `ToolCallAgent.act` 方法,使用 `asyncio.gather` 并行执行非冲突的工具调用。 + +### 2.2 增强规划与自我修正机制 +- **目标**:在执行复杂任务前先生成详细计划,并在每一步执行后进行自我评估。 +- **实施**: + - 引入 `PlanningAgent` 或在 `Manus` 类中增加规划步骤。 + - 在 `think` 循环中增加一个 `reflect` 步骤,让 Agent 评估上一步的结果是否符合预期。 + +### 2.3 优化上下文管理 +- **目标**:减少 Token 消耗,提高长任务的稳定性。 +- **实施**: + - 实现对话摘要机制,当消息历史过长时,自动对旧消息进行摘要。 + - 优化工具返回结果的截断逻辑,只保留关键信息。 + +### 2.4 提升错误处理与恢复能力 +- **目标**:增强 Agent 的鲁棒性。 +- **实施**: + - 增加更细致的异常捕获和重试机制。 + - 实现简单的状态保存功能,将 `memory` 和 `state` 定期写入本地文件。 + +## 3. 本次优先实施的优化 +考虑到环境限制和代码复杂度,本次将重点实施以下优化: +1. **并行工具执行**:显著提升效率。 +2. **增强错误处理**:提高稳定性。 +3. **优化工具返回结果的截断逻辑**:节省 Token。