Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions backend/README_STRUCTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,20 +144,3 @@ app = FastAPI()
for router in all_routers:
app.include_router(router)
```

## 注意事项

1. **备份**: 重构前已备份到 `temp_restructure/` 目录
2. **测试**: 重构后需要全面测试所有功能
3. **文档**: 更新相关文档和注释
4. **部署**: 确保生产环境的平滑过渡

## 重构完成状态

- [x] 目录结构重组
- [x] 文件重命名
- [x] 导入路径更新
- [x] 模块初始化文件创建
- [x] 向后兼容性维护

重构完成!代码结构现在更加清晰、规范,便于维护和扩展。
10 changes: 5 additions & 5 deletions backend/ai_system/config/async_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ class AsyncConfig:

# LLM流式处理配置
LLM_STREAM_CONFIG = {
"yield_interval": 5, # 每多少个chunk让出控制权
"yield_delay": 0.0005, # 让出控制权的延迟时间(秒)
"yield_interval": 3, # 每多少个chunk让出控制权(减少间隔)
"yield_delay": 0.0001, # 让出控制权的延迟时间(秒)(减少延迟
"max_concurrent_tools": 1, # 最大并发工具调用数(改为1,顺序执行)
}

# WebSocket配置
WEBSOCKET_CONFIG = {
"content_yield_threshold": 50, # 内容长度超过此值才让出控制权
"content_yield_delay": 0.0005, # 内容发送的延迟时间
"content_yield_threshold": 20, # 内容长度超过此值才让出控制权(减少阈值)
"content_yield_delay": 0.0001, # 内容发送的延迟时间(减少延迟)
"json_block_yield_delay": 0, # JSON块发送的延迟时间(0表示不延迟)
"heartbeat_interval": 30, # 心跳间隔(秒)
"connection_timeout": 300, # 连接超时时间(秒)
Expand All @@ -28,7 +28,7 @@ class AsyncConfig:
# 工具调用配置
TOOL_CALL_CONFIG = {
"sequential_execution": True, # 是否顺序执行工具调用
"execution_yield_delay": 0.001, # 工具调用间的延迟时间
"execution_yield_delay": 0.0001, # 工具调用间的延迟时间(减少延迟)
"max_retry_attempts": 2, # 最大重试次数
"retry_delay": 0.1, # 重试延迟时间(秒)
}
Expand Down
32 changes: 29 additions & 3 deletions backend/ai_system/core_agents/main_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,12 @@ def load_conversation_history(self, history_messages: List[Dict[str, Any]]):
# 添加历史消息,但过滤掉tool消息(避免上下文过长)
for msg in history_messages:
if msg.get('role') in ["user", "assistant"]:
self.messages.append(msg)
# 确保消息内容不为空
content = msg.get('content', '').strip()
if content or msg.get('role') == 'user': # 用户消息即使为空也要保留
self.messages.append(msg)
else:
logger.warning(f"跳过空的assistant消息: {msg.get('id', 'unknown')}")

# 检查是否需要压缩上下文
self._check_and_compress_context()
Expand Down Expand Up @@ -463,9 +468,27 @@ async def run(self, user_problem: str):
iteration_count += 1
logger.info(f"MainAgent第 {iteration_count} 次迭代")

# 验证消息列表,确保没有空消息
valid_messages = []
for msg in self.messages:
if msg.get('role') == 'assistant' and not msg.get('content', '').strip():
logger.warning(f"跳过空的assistant消息: {msg.get('id', 'unknown')}")
continue
valid_messages.append(msg)

if len(valid_messages) != len(self.messages):
logger.info(f"过滤了 {len(self.messages) - len(valid_messages)} 个空消息")
self.messages = valid_messages

# 使用异步LLM处理
assistant_message, tool_calls = await self.llm_handler.process_stream(
self.messages, self.tools)

# 确保assistant消息不为空
if not assistant_message.get('content', '').strip() and not tool_calls:
logger.warning("收到空的assistant消息,跳过添加")
continue

self.messages.append(assistant_message)

if not tool_calls:
Expand Down Expand Up @@ -497,7 +520,7 @@ async def run(self, user_problem: str):
"content": tool_result,
})

# 使用配置参数优化延迟
# 使用配置参数优化延迟,确保事件循环不被阻塞
await asyncio.sleep(config["execution_yield_delay"])

except Exception as e:
Expand All @@ -510,8 +533,11 @@ async def run(self, user_problem: str):
})
tool_results.append(f"工具执行失败: {str(e)}")

# 使用配置参数优化延迟
# 使用配置参数优化延迟,确保事件循环不被阻塞
await asyncio.sleep(config["execution_yield_delay"])

# 在每个工具调用之间额外让出控制权,防止长时间阻塞
await asyncio.sleep(0.001)

logger.info(
f"MainAgent执行完成,总共 {iteration_count} 次迭代,最终消息历史长度: {len(self.messages)}")
Expand Down
9 changes: 6 additions & 3 deletions backend/ai_system/core_handlers/llm_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,9 @@ async def process_stream(self, messages: List[Dict[str, Any]], tools: List[Dict[
for i, tool_call in enumerate(delta.tool_calls):
logger.debug(f" Chunk {chunk_count} 工具调用 {i}: id={tool_call.id}, name={getattr(tool_call.function, 'name', 'None') if tool_call.function else 'None'}")

# 定期让出控制权,确保其他异步任务能够执行
# 每次处理chunk后都让出控制权,确保事件循环不被阻塞
config = AsyncConfig.get_llm_stream_config()
if chunk_count % (config["yield_interval"] * 2) == 0:
await asyncio.sleep(config["yield_delay"])
await asyncio.sleep(config["yield_delay"])

# 流式响应结束后,统一处理所有工具调用(完全等待模式)
if tool_call_chunks:
Expand All @@ -140,6 +139,10 @@ async def process_stream(self, messages: List[Dict[str, Any]], tools: List[Dict[
await self.stream_manager.finalize_message()

# 构建完整的 assistant 消息
# 确保content不为空,即使只有工具调用
if not full_response_content.strip() and tool_calls:
full_response_content = "正在执行工具调用..."

assistant_message = {"role": "assistant",
"content": full_response_content}
if tool_calls:
Expand Down
2 changes: 1 addition & 1 deletion backend/ai_system/core_managers/stream_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def _output(self, content: str):
logger.debug(f"成功调用回调函数,内容长度: {len(content)}")

# 让出控制权,确保事件循环能处理其他任务
await asyncio.sleep(0.001)
await asyncio.sleep(0.0001) # 减少延迟,提高响应性
except Exception as e:
logger.error(f"回调函数调用失败: {e}")
else:
Expand Down
57 changes: 32 additions & 25 deletions backend/ai_system/core_tools/code_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,34 +303,41 @@ async def _run_in_subprocess(self, temp_file: str) -> str:
env['PYTHONIOENCODING'] = 'utf-8'

# 在子进程中执行,设置工作目录为workspace_dir
result = subprocess.run(
[sys.executable, temp_file],
# 使用asyncio.subprocess来避免阻塞事件循环
process = await asyncio.create_subprocess_exec(
sys.executable, temp_file,
cwd=self.workspace_dir, # 关键:设置工作目录
capture_output=True,
text=True,
encoding='utf-8',
timeout=60, # 60秒超时
env=env,
errors='replace' # 处理编码错误
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env
)

# 处理执行结果
if result.returncode != 0:
error_output = result.stderr.strip() if result.stderr else "代码执行失败,无错误信息"
return f"执行错误 (返回码: {result.returncode}):\n{error_output}"

# 返回标准输出,处理可能的None值
if result.stdout is None:
output = ""
else:
output = result.stdout.strip()

return output if output else "代码执行完成,无输出"

except subprocess.TimeoutExpired:
return "代码执行超时(60秒),请检查是否有无限循环或耗时操作"
except subprocess.CalledProcessError as e:
return f"子进程执行失败: {str(e)}"
try:
# 等待进程完成,设置超时
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=60 # 60秒超时
)

# 解码输出
stdout_text = stdout.decode('utf-8', errors='replace') if stdout else ""
stderr_text = stderr.decode('utf-8', errors='replace') if stderr else ""

# 处理执行结果
if process.returncode != 0:
error_output = stderr_text.strip() if stderr_text else "代码执行失败,无错误信息"
return f"执行错误 (返回码: {process.returncode}):\n{error_output}"

# 返回标准输出
output = stdout_text.strip()
return output if output else "代码执行完成,无输出"

except asyncio.TimeoutError:
# 超时处理
process.kill()
await process.wait()
return "代码执行超时(60秒),请检查是否有无限循环或耗时操作"

except Exception as e:
logger.error(f"子进程执行异常: {e}")
return f"子进程执行异常: {str(e)}"
Expand Down
1 change: 1 addition & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ dependencies = [
"scipy>=1.15.3",
"pandas>=2.3.1",
"seaborn>=0.13.2",
"watchdog>=6.0.0",
]
Loading