diff --git a/backend/app/services/report_agent.py b/backend/app/services/report_agent.py index 02ca5bdc2..28e24b902 100644 --- a/backend/app/services/report_agent.py +++ b/backend/app/services/report_agent.py @@ -19,7 +19,7 @@ from enum import Enum from ..config import Config -from ..utils.llm_client import LLMClient +from ..utils.llm_client import LLMClient, estimate_messages_tokens from ..utils.logger import get_logger from .zep_tools import ( ZepToolsService, @@ -529,7 +529,7 @@ def to_dict(self) -> Dict[str, Any]: 1. 自动读取人设文件,了解所有模拟Agent 2. 智能选择与采访主题最相关的Agent(如学生、媒体、官方等) 3. 自动生成采访问题 -4. 调用 /api/simulation/interview/batch 接口在双平台进行真实采访 +4. 直接调用 SimulationRunner 在双平台进行真实采访 5. 整合所有采访结果,提供多视角分析 【使用场景】 @@ -717,22 +717,12 @@ def to_dict(self) -> Dict[str, Any]: 【工作流程】 ═══════════════════════════════════════════════════════════════ -每次回复你只能做以下两件事之一(不可同时做): - -选项A - 调用工具: -输出你的思考,然后用以下格式调用一个工具: - -{{"name": "工具名称", "parameters": {{"参数名": "参数值"}}}} - -系统会执行工具并把结果返回给你。你不需要也不能自己编写工具返回结果。 - -选项B - 输出最终内容: -当你已通过工具获取了足够信息,以 "Final Answer:" 开头输出章节内容。 +1. 先调用工具(3-5次)检索模拟数据,系统会自动将结果返回给你 +2. 获得足够信息后,以 "Final Answer:" 开头输出章节正文内容 ⚠️ 严格禁止: -- 禁止在一次回复中同时包含工具调用和 Final Answer -- 禁止自己编造工具返回结果(Observation),所有工具结果由系统注入 -- 每次回复最多调用一个工具 +- 禁止在输出 Final Answer 时再调用工具 +- 禁止自己编造工具返回结果,所有工具结果由系统自动注入 ═══════════════════════════════════════════════════════════════ 【章节内容要求】 @@ -786,9 +776,8 @@ def to_dict(self) -> Dict[str, Any]: - ✅ 直接写正文,用**粗体**代替小节标题 请开始: -1. 首先思考(Thought)这个章节需要什么信息 -2. 然后调用工具(Action)获取模拟数据 -3. 收集足够信息后输出 Final Answer(纯正文,无任何标题)""" +1. 调用工具获取模拟数据(至少3次,混合使用不同工具) +2. 收集足够信息后输出 Final Answer:(纯正文,无任何标题)""" # ── ReACT 循环内消息模板 ── @@ -916,7 +905,7 @@ def __init__( logger.info(f"ReportAgent 初始化完成: graph_id={graph_id}, simulation_id={simulation_id}") def _define_tools(self) -> Dict[str, Dict[str, Any]]: - """定义可用工具""" + """定义可用工具(内部描述格式,供 _get_tools_description 使用)""" return { "insight_forge": { "name": "insight_forge", @@ -951,6 +940,59 @@ def _define_tools(self) -> Dict[str, Dict[str, Any]]: } } } + + def _get_openai_tools(self) -> List[Dict[str, Any]]: + """ + 将内部工具定义转换为 OpenAI function calling 格式。 + + Returns: + OpenAI tools 列表 + """ + openai_tools = [] + param_schemas = { + "insight_forge": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "你想深入分析的问题或话题"}, + "report_context": {"type": "string", "description": "当前报告章节的上下文(可选)"}, + }, + "required": ["query"], + }, + "panorama_search": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "搜索查询,用于相关性排序"}, + "include_expired": {"type": "boolean", "description": "是否包含过期/历史内容(默认true)"}, + }, + "required": ["query"], + }, + "quick_search": { + "type": "object", + "properties": { + "query": {"type": "string", "description": "搜索查询字符串"}, + "limit": {"type": "integer", "description": "返回结果数量(默认10)"}, + }, + "required": ["query"], + }, + "interview_agents": { + "type": "object", + "properties": { + "interview_topic": {"type": "string", "description": "采访主题或需求描述"}, + "max_agents": {"type": "integer", "description": "最多采访Agent数量(默认5,最大10)"}, + }, + "required": ["interview_topic"], + }, + } + for name, tool in self.tools.items(): + openai_tools.append({ + "type": "function", + "function": { + "name": name, + "description": tool["description"], + "parameters": param_schemas[name], + }, + }) + return openai_tools def _execute_tool(self, tool_name: str, parameters: Dict[str, Any], report_context: str = "") -> str: """ @@ -1279,105 +1321,167 @@ def _generate_section_react( {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ] - - # ReACT循环 + + # Context budget: 保留 system+user 消息不剪裁;observation 消息超限时截断最旧的 + CONTEXT_TOKEN_BUDGET = 28000 # 为 completion 留出 ~4K tokens + + # ReACT循环(使用 OpenAI native function calling) tool_calls_count = 0 - max_iterations = 5 # 最大迭代轮数 - min_tool_calls = 3 # 最少工具调用次数 - conflict_retries = 0 # 工具调用与Final Answer同时出现的连续冲突次数 - used_tools = set() # 记录已调用过的工具名 + max_iterations = 12 # 每次工具调用占 2 次迭代(request + tool result),留余量 + min_tool_calls = 3 + used_tools: set = set() all_tools = {"insight_forge", "panorama_search", "quick_search", "interview_agents"} + openai_tools = self._get_openai_tools() # 报告上下文,用于InsightForge的子问题生成 report_context = f"章节标题: {section.title}\n模拟需求: {self.simulation_requirement}" - + for iteration in range(max_iterations): if progress_callback: progress_callback( - "generating", - int((iteration / max_iterations) * 100), + "generating", + int((tool_calls_count / max(self.MAX_TOOL_CALLS_PER_SECTION, 1)) * 100), f"深度检索与撰写中 ({tool_calls_count}/{self.MAX_TOOL_CALLS_PER_SECTION})" ) - - # 调用LLM - response = self.llm.chat( - messages=messages, - temperature=0.5, - max_tokens=4096 - ) - # 检查 LLM 返回是否为 None(API 异常或内容为空) - if response is None: - logger.warning(f"章节 {section.title} 第 {iteration + 1} 次迭代: LLM 返回 None") - # 如果还有迭代次数,添加消息并重试 - if iteration < max_iterations - 1: - messages.append({"role": "assistant", "content": "(响应为空)"}) - messages.append({"role": "user", "content": "请继续生成内容。"}) - continue - # 最后一次迭代也返回 None,跳出循环进入强制收尾 - break + # ── Context budget 检查:超限时移除最旧的工具调用消息组 ── + while estimate_messages_tokens(messages) > CONTEXT_TOKEN_BUDGET and len(messages) > 3: + # messages[0]=system, messages[1]=user(初始) + # native function calling 格式: assistant(tool_calls) + tool(result) 成对出现 + if len(messages) > 3: + removed_tokens = estimate_messages_tokens(messages[2:4]) + messages = messages[:2] + messages[4:] + logger.warning( + f"章节 {section.title}: 上下文超限,移除旧工具消息对 " + f"(~{removed_tokens} tokens), 剩余 {len(messages)} 条消息" + ) + else: + break - logger.debug(f"LLM响应: {response[:200]}...") + # ── 工具额度已耗尽,切换到无工具模式让 LLM 输出 Final Answer ── + active_tools = openai_tools if tool_calls_count < self.MAX_TOOL_CALLS_PER_SECTION else None - # 解析一次,复用结果 - tool_calls = self._parse_tool_calls(response) - has_tool_calls = bool(tool_calls) - has_final_answer = "Final Answer:" in response + # 如果工具额度已耗尽,追加提示 + if active_tools is None and tool_calls_count >= self.MAX_TOOL_CALLS_PER_SECTION: + messages.append({ + "role": "user", + "content": REACT_TOOL_LIMIT_MSG.format( + tool_calls_count=tool_calls_count, + max_tool_calls=self.MAX_TOOL_CALLS_PER_SECTION, + ), + }) + active_tools = None # 不传 tools,强制文本输出 - # ── 冲突处理:LLM 同时输出了工具调用和 Final Answer ── - if has_tool_calls and has_final_answer: - conflict_retries += 1 - logger.warning( - f"章节 {section.title} 第 {iteration+1} 轮: " - f"LLM 同时输出工具调用和 Final Answer(第 {conflict_retries} 次冲突)" - ) + try: + if active_tools: + content, native_tool_calls = self.llm.chat_with_tools( + messages=messages, + tools=active_tools, + temperature=0.5, + max_tokens=4096, + ) + else: + content = self.llm.chat(messages=messages, temperature=0.5, max_tokens=4096) + native_tool_calls = None + except Exception as e: + logger.error(f"章节 {section.title} LLM 调用失败: {e}") + if iteration < max_iterations - 1: + continue + break - if conflict_retries <= 2: - # 前两次:丢弃本次响应,要求 LLM 重新回复 - messages.append({"role": "assistant", "content": response}) - messages.append({ - "role": "user", - "content": ( - "【格式错误】你在一次回复中同时包含了工具调用和 Final Answer,这是不允许的。\n" - "每次回复只能做以下两件事之一:\n" - "- 调用一个工具(输出一个 块,不要写 Final Answer)\n" - "- 输出最终内容(以 'Final Answer:' 开头,不要包含 )\n" - "请重新回复,只做其中一件事。" - ), - }) + if content is None and native_tool_calls is None: + logger.warning(f"章节 {section.title} 第 {iteration + 1} 次迭代: LLM 返回为空") + if iteration < max_iterations - 1: continue - else: - # 第三次:降级处理,截断到第一个工具调用,强制执行 - logger.warning( - f"章节 {section.title}: 连续 {conflict_retries} 次冲突," - "降级为截断执行第一个工具调用" - ) - first_tool_end = response.find('') - if first_tool_end != -1: - response = response[:first_tool_end + len('')] - tool_calls = self._parse_tool_calls(response) - has_tool_calls = bool(tool_calls) - has_final_answer = False - conflict_retries = 0 + break + + has_final_answer = bool(content and "Final Answer:" in content) # 记录 LLM 响应日志 if self.report_logger: self.report_logger.log_llm_response( section_title=section.title, section_index=section_index, - response=response, + response=content or "", iteration=iteration + 1, - has_tool_calls=has_tool_calls, - has_final_answer=has_final_answer + has_tool_calls=bool(native_tool_calls), + has_final_answer=has_final_answer, ) - # ── 情况1:LLM 输出了 Final Answer ── + # ── 情况1:LLM 调用了工具 ── + if native_tool_calls: + # 将 assistant 消息(含 tool_calls)加入历史 + assistant_msg: Dict[str, Any] = { + "role": "assistant", + "content": content, + "tool_calls": [ + { + "id": tc["id"], + "type": "function", + "function": { + "name": tc["name"], + "arguments": json.dumps(tc["parameters"], ensure_ascii=False), + }, + } + for tc in native_tool_calls + ], + } + messages.append(assistant_msg) + + # 依次执行所有工具调用,加入 tool 消息 + for tc in native_tool_calls: + if tool_calls_count >= self.MAX_TOOL_CALLS_PER_SECTION: + # 超额的工具调用返回提示,不真正执行 + tool_result = "工具调用次数已达上限,请直接输出 Final Answer。" + else: + if self.report_logger: + self.report_logger.log_tool_call( + section_title=section.title, + section_index=section_index, + tool_name=tc["name"], + parameters=tc["parameters"], + iteration=iteration + 1, + ) + tool_result = self._execute_tool( + tc["name"], tc["parameters"], report_context=report_context + ) + if self.report_logger: + self.report_logger.log_tool_result( + section_title=section.title, + section_index=section_index, + tool_name=tc["name"], + result=tool_result, + iteration=iteration + 1, + ) + tool_calls_count += 1 + used_tools.add(tc["name"]) + + messages.append({ + "role": "tool", + "tool_call_id": tc["id"], + "content": tool_result, + }) + + # 如果已达上限,提示输出 Final Answer(下一轮不传 tools) + if tool_calls_count >= self.MAX_TOOL_CALLS_PER_SECTION: + unused_tools = all_tools - used_tools + if unused_tools: + logger.info( + f"章节 {section.title}: 工具调用达上限," + f"未使用的工具: {unused_tools}" + ) + continue + + # ── 情况2:LLM 输出文本(含或不含 Final Answer) ── if has_final_answer: - # 工具调用次数不足,拒绝并要求继续调工具 if tool_calls_count < min_tool_calls: - messages.append({"role": "assistant", "content": response}) + # 工具调用不足,要求继续 + messages.append({"role": "assistant", "content": content}) unused_tools = all_tools - used_tools - unused_hint = f"(这些工具还未使用,推荐用一下他们: {', '.join(unused_tools)})" if unused_tools else "" + unused_hint = ( + f"(建议使用这些未用过的工具: {', '.join(unused_tools)})" + if unused_tools else "" + ) messages.append({ "role": "user", "content": REACT_INSUFFICIENT_TOOLS_MSG.format( @@ -1388,131 +1492,62 @@ def _generate_section_react( }) continue - # 正常结束 - final_answer = response.split("Final Answer:")[-1].strip() + final_answer = content.split("Final Answer:")[-1].strip() logger.info(f"章节 {section.title} 生成完成(工具调用: {tool_calls_count}次)") - if self.report_logger: self.report_logger.log_section_content( section_title=section.title, section_index=section_index, content=final_answer, - tool_calls_count=tool_calls_count + tool_calls_count=tool_calls_count, ) return final_answer - # ── 情况2:LLM 尝试调用工具 ── - if has_tool_calls: - # 工具额度已耗尽 → 明确告知,要求输出 Final Answer - if tool_calls_count >= self.MAX_TOOL_CALLS_PER_SECTION: - messages.append({"role": "assistant", "content": response}) - messages.append({ - "role": "user", - "content": REACT_TOOL_LIMIT_MSG.format( - tool_calls_count=tool_calls_count, - max_tool_calls=self.MAX_TOOL_CALLS_PER_SECTION, - ), - }) - continue - - # 只执行第一个工具调用 - call = tool_calls[0] - if len(tool_calls) > 1: - logger.info(f"LLM 尝试调用 {len(tool_calls)} 个工具,只执行第一个: {call['name']}") - - if self.report_logger: - self.report_logger.log_tool_call( - section_title=section.title, - section_index=section_index, - tool_name=call["name"], - parameters=call.get("parameters", {}), - iteration=iteration + 1 - ) - - result = self._execute_tool( - call["name"], - call.get("parameters", {}), - report_context=report_context + # 没有 Final Answer 前缀 + if tool_calls_count >= min_tool_calls: + # 已有足够工具调用,直接采纳 + logger.info( + f"章节 {section.title}: 未检测到 'Final Answer:' 前缀," + f"直接采纳(工具调用: {tool_calls_count}次)" ) - + final_answer = (content or "").strip() if self.report_logger: - self.report_logger.log_tool_result( + self.report_logger.log_section_content( section_title=section.title, section_index=section_index, - tool_name=call["name"], - result=result, - iteration=iteration + 1 - ) - - tool_calls_count += 1 - used_tools.add(call['name']) - - # 构建未使用工具提示 - unused_tools = all_tools - used_tools - unused_hint = "" - if unused_tools and tool_calls_count < self.MAX_TOOL_CALLS_PER_SECTION: - unused_hint = REACT_UNUSED_TOOLS_HINT.format(unused_list="、".join(unused_tools)) - - messages.append({"role": "assistant", "content": response}) - messages.append({ - "role": "user", - "content": REACT_OBSERVATION_TEMPLATE.format( - tool_name=call["name"], - result=result, - tool_calls_count=tool_calls_count, - max_tool_calls=self.MAX_TOOL_CALLS_PER_SECTION, - used_tools_str=", ".join(used_tools), - unused_hint=unused_hint, - ), - }) - continue - - # ── 情况3:既没有工具调用,也没有 Final Answer ── - messages.append({"role": "assistant", "content": response}) - - if tool_calls_count < min_tool_calls: - # 工具调用次数不足,推荐未用过的工具 - unused_tools = all_tools - used_tools - unused_hint = f"(这些工具还未使用,推荐用一下他们: {', '.join(unused_tools)})" if unused_tools else "" - - messages.append({ - "role": "user", - "content": REACT_INSUFFICIENT_TOOLS_MSG_ALT.format( + content=final_answer, tool_calls_count=tool_calls_count, - min_tool_calls=min_tool_calls, - unused_hint=unused_hint, - ), - }) - continue + ) + return final_answer - # 工具调用已足够,LLM 输出了内容但没带 "Final Answer:" 前缀 - # 直接将这段内容作为最终答案,不再空转 - logger.info(f"章节 {section.title} 未检测到 'Final Answer:' 前缀,直接采纳LLM输出作为最终内容(工具调用: {tool_calls_count}次)") - final_answer = response.strip() + # 工具调用不足,要求继续调用 + messages.append({"role": "assistant", "content": content or ""}) + unused_tools = all_tools - used_tools + unused_hint = ( + f"(建议使用这些未用过的工具: {', '.join(unused_tools)})" + if unused_tools else "" + ) + messages.append({ + "role": "user", + "content": REACT_INSUFFICIENT_TOOLS_MSG_ALT.format( + tool_calls_count=tool_calls_count, + min_tool_calls=min_tool_calls, + unused_hint=unused_hint, + ), + }) - if self.report_logger: - self.report_logger.log_section_content( - section_title=section.title, - section_index=section_index, - content=final_answer, - tool_calls_count=tool_calls_count - ) - return final_answer - # 达到最大迭代次数,强制生成内容 logger.warning(f"章节 {section.title} 达到最大迭代次数,强制生成") messages.append({"role": "user", "content": REACT_FORCE_FINAL_MSG}) - - response = self.llm.chat( - messages=messages, - temperature=0.5, - max_tokens=4096 - ) - # 检查强制收尾时 LLM 返回是否为 None - if response is None: + try: + response = self.llm.chat(messages=messages, temperature=0.5, max_tokens=4096) + except Exception: + response = None + + if not response: logger.error(f"章节 {section.title} 强制收尾时 LLM 返回 None,使用默认错误提示") - final_answer = f"(本章节生成失败:LLM 返回空响应,请稍后重试)" + final_answer = "(本章节生成失败:LLM 返回空响应,请稍后重试)" elif "Final Answer:" in response: final_answer = response.split("Final Answer:")[-1].strip() else: @@ -1629,14 +1664,29 @@ def generate_report( # 阶段2: 逐章节生成(分章节保存) report.status = ReportStatus.GENERATING - + total_sections = len(outline.sections) generated_sections = [] # 保存内容用于上下文 - + for i, section in enumerate(outline.sections): section_num = i + 1 base_progress = 20 + int((i / total_sections) * 70) - + + # ── Checkpoint:检查章节是否已生成(支持 resume) ── + checkpoint_content = ReportManager.load_section_checkpoint(report_id, section_num) + if checkpoint_content is not None: + logger.info(f"章节 {section_num} 从 checkpoint 恢复: {section.title}") + section.content = checkpoint_content + generated_sections.append(f"## {section.title}\n\n{checkpoint_content}") + completed_section_titles.append(section.title) + ReportManager.update_progress( + report_id, "generating", base_progress, + f"章节 {section.title} 已从缓存恢复 ({section_num}/{total_sections})", + current_section=None, + completed_sections=completed_section_titles + ) + continue + # 更新进度 ReportManager.update_progress( report_id, "generating", base_progress, @@ -1644,14 +1694,14 @@ def generate_report( current_section=section.title, completed_sections=completed_section_titles ) - + if progress_callback: progress_callback( - "generating", - base_progress, + "generating", + base_progress, f"正在生成章节: {section.title} ({section_num}/{total_sections})" ) - + # 生成主章节内容 section_content = self._generate_section_react( section=section, @@ -1659,13 +1709,13 @@ def generate_report( previous_sections=generated_sections, progress_callback=lambda stage, prog, msg: progress_callback( - stage, + stage, base_progress + int(prog * 0.7 / total_sections), msg ) if progress_callback else None, section_index=section_num ) - + section.content = section_content generated_sections.append(f"## {section.title}\n\n{section_content}") @@ -1819,59 +1869,66 @@ def chat( "content": message }) - # ReACT循环(简化版) + # ReACT循环(简化版,使用 native function calling) tool_calls_made = [] - max_iterations = 2 # 减少迭代轮数 - - for iteration in range(max_iterations): - response = self.llm.chat( - messages=messages, - temperature=0.5 - ) - - # 解析工具调用 - tool_calls = self._parse_tool_calls(response) - - if not tool_calls: - # 没有工具调用,直接返回响应 - clean_response = re.sub(r'.*?', '', response, flags=re.DOTALL) - clean_response = re.sub(r'\[TOOL_CALL\].*?\)', '', clean_response) - + openai_tools = self._get_openai_tools() + + for iteration in range(self.MAX_TOOL_CALLS_PER_CHAT + 1): + # 最后一轮不传 tools,强制文本输出 + active_tools = openai_tools if len(tool_calls_made) < self.MAX_TOOL_CALLS_PER_CHAT else None + + try: + if active_tools: + content, native_tool_calls = self.llm.chat_with_tools( + messages=messages, tools=active_tools, temperature=0.5 + ) + else: + content = self.llm.chat(messages=messages, temperature=0.5) + native_tool_calls = None + except Exception as e: + logger.error(f"chat() LLM 调用失败: {e}") + break + + if not native_tool_calls: + # 没有工具调用,返回最终响应 return { - "response": clean_response.strip(), + "response": (content or "").strip(), "tool_calls": tool_calls_made, - "sources": [tc.get("parameters", {}).get("query", "") for tc in tool_calls_made] + "sources": [ + tc.get("parameters", {}).get("query", "") for tc in tool_calls_made + ], } - - # 执行工具调用(限制数量) - tool_results = [] - for call in tool_calls[:1]: # 每轮最多执行1次工具调用 - if len(tool_calls_made) >= self.MAX_TOOL_CALLS_PER_CHAT: - break - result = self._execute_tool(call["name"], call.get("parameters", {})) - tool_results.append({ - "tool": call["name"], - "result": result[:1500] # 限制结果长度 - }) - tool_calls_made.append(call) - - # 将结果添加到消息 - messages.append({"role": "assistant", "content": response}) - observation = "\n".join([f"[{r['tool']}结果]\n{r['result']}" for r in tool_results]) + + # 添加 assistant 消息(含 tool_calls) messages.append({ - "role": "user", - "content": observation + CHAT_OBSERVATION_SUFFIX + "role": "assistant", + "content": content, + "tool_calls": [ + { + "id": tc["id"], + "type": "function", + "function": { + "name": tc["name"], + "arguments": json.dumps(tc["parameters"], ensure_ascii=False), + }, + } + for tc in native_tool_calls + ], }) - + + # 执行工具并添加 tool 消息 + for tc in native_tool_calls[:1]: # chat 模式每轮只执行1个 + result = self._execute_tool(tc["name"], tc.get("parameters", {})) + tool_calls_made.append(tc) + messages.append({ + "role": "tool", + "tool_call_id": tc["id"], + "content": result[:1500] + CHAT_OBSERVATION_SUFFIX, + }) + # 达到最大迭代,获取最终响应 - final_response = self.llm.chat( - messages=messages, - temperature=0.5 - ) - - # 清理响应 - clean_response = re.sub(r'.*?', '', final_response, flags=re.DOTALL) - clean_response = re.sub(r'\[TOOL_CALL\].*?\)', '', clean_response) + final_response = self.llm.chat(messages=messages, temperature=0.5) or "" + clean_response = final_response return { "response": clean_response.strip(), @@ -2126,7 +2183,34 @@ def save_section( logger.info(f"章节已保存: {report_id}/{file_suffix}") return file_path - + + @classmethod + def load_section_checkpoint(cls, report_id: str, section_index: int) -> Optional[str]: + """ + 加载已保存章节的内容(用于 resume)。 + + Args: + report_id: 报告ID + section_index: 章节索引(从1开始) + + Returns: + 章节纯正文内容(不含 ## 标题行),若不存在则返回 None + """ + file_path = cls._get_section_path(report_id, section_index) + if not os.path.exists(file_path): + return None + try: + with open(file_path, 'r', encoding='utf-8') as f: + raw = f.read().strip() + # 去掉第一行的 "## 标题" 以还原纯正文 + lines = raw.split('\n') + if lines and lines[0].startswith('## '): + lines = lines[1:] + return '\n'.join(lines).strip() + except Exception as e: + logger.warning(f"加载章节 checkpoint 失败: {report_id}/section_{section_index:02d}.md, {e}") + return None + @classmethod def _clean_section_content(cls, content: str, section_title: str) -> str: """ diff --git a/backend/app/services/simulation_runner.py b/backend/app/services/simulation_runner.py index 8c35380d1..d1dd0d2cc 100644 --- a/backend/app/services/simulation_runner.py +++ b/backend/app/services/simulation_runner.py @@ -222,20 +222,25 @@ class SimulationRunner: _monitor_threads: Dict[str, threading.Thread] = {} _stdout_files: Dict[str, Any] = {} # 存储 stdout 文件句柄 _stderr_files: Dict[str, Any] = {} # 存储 stderr 文件句柄 - + # 图谱记忆更新配置 _graph_memory_enabled: Dict[str, bool] = {} # simulation_id -> enabled + + # 保护所有共享 dict 的可重入锁 + _state_lock: threading.RLock = threading.RLock() @classmethod def get_run_state(cls, simulation_id: str) -> Optional[SimulationRunState]: """获取运行状态""" - if simulation_id in cls._run_states: - return cls._run_states[simulation_id] - - # 尝试从文件加载 + with cls._state_lock: + if simulation_id in cls._run_states: + return cls._run_states[simulation_id] + + # 尝试从文件加载(文件 IO 不持锁,避免长时间占锁) state = cls._load_run_state(simulation_id) if state: - cls._run_states[simulation_id] = state + with cls._state_lock: + cls._run_states[simulation_id] = state return state @classmethod @@ -300,13 +305,15 @@ def _save_run_state(cls, state: SimulationRunState): sim_dir = os.path.join(cls.RUN_STATE_DIR, state.simulation_id) os.makedirs(sim_dir, exist_ok=True) state_file = os.path.join(sim_dir, "run_state.json") - + data = state.to_detail_dict() - + + # 文件 IO 不持锁 with open(state_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) - - cls._run_states[state.simulation_id] = state + + with cls._state_lock: + cls._run_states[state.simulation_id] = state @classmethod def start_simulation( @@ -446,15 +453,15 @@ def start_simulation( start_new_session=True, # 创建新进程组,确保服务器关闭时能终止所有相关进程 ) - # 保存文件句柄以便后续关闭 - cls._stdout_files[simulation_id] = main_log_file - cls._stderr_files[simulation_id] = None # 不再需要单独的 stderr - state.process_pid = process.pid state.runner_status = RunnerStatus.RUNNING - cls._processes[simulation_id] = process cls._save_run_state(state) - + + with cls._state_lock: + cls._stdout_files[simulation_id] = main_log_file + cls._stderr_files[simulation_id] = None # 不再需要单独的 stderr + cls._processes[simulation_id] = process + # 启动监控线程 monitor_thread = threading.Thread( target=cls._monitor_simulation, @@ -462,7 +469,8 @@ def start_simulation( daemon=True ) monitor_thread.start() - cls._monitor_threads[simulation_id] = monitor_thread + with cls._state_lock: + cls._monitor_threads[simulation_id] = monitor_thread logger.info(f"模拟启动成功: {simulation_id}, pid={process.pid}, platform={platform}") @@ -483,9 +491,10 @@ def _monitor_simulation(cls, simulation_id: str): twitter_actions_log = os.path.join(sim_dir, "twitter", "actions.jsonl") reddit_actions_log = os.path.join(sim_dir, "reddit", "actions.jsonl") - process = cls._processes.get(simulation_id) + with cls._state_lock: + process = cls._processes.get(simulation_id) state = cls.get_run_state(simulation_id) - + if not process or not state: return @@ -549,31 +558,28 @@ def _monitor_simulation(cls, simulation_id: str): finally: # 停止图谱记忆更新器 - if cls._graph_memory_enabled.get(simulation_id, False): + with cls._state_lock: + graph_mem_enabled = cls._graph_memory_enabled.pop(simulation_id, False) + if graph_mem_enabled: try: ZepGraphMemoryManager.stop_updater(simulation_id) logger.info(f"已停止图谱记忆更新: simulation_id={simulation_id}") except Exception as e: logger.error(f"停止图谱记忆更新器失败: {e}") - cls._graph_memory_enabled.pop(simulation_id, None) - - # 清理进程资源 - cls._processes.pop(simulation_id, None) - cls._action_queues.pop(simulation_id, None) - - # 关闭日志文件句柄 - if simulation_id in cls._stdout_files: - try: - cls._stdout_files[simulation_id].close() - except Exception: - pass - cls._stdout_files.pop(simulation_id, None) - if simulation_id in cls._stderr_files and cls._stderr_files[simulation_id]: - try: - cls._stderr_files[simulation_id].close() - except Exception: - pass - cls._stderr_files.pop(simulation_id, None) + + # 清理进程资源并关闭文件句柄(持锁摘出句柄,锁外关闭) + with cls._state_lock: + cls._processes.pop(simulation_id, None) + cls._action_queues.pop(simulation_id, None) + stdout_fh = cls._stdout_files.pop(simulation_id, None) + stderr_fh = cls._stderr_files.pop(simulation_id, None) + + for fh in (stdout_fh, stderr_fh): + if fh: + try: + fh.close() + except Exception: + pass @classmethod def _read_action_log( diff --git a/backend/app/services/zep_tools.py b/backend/app/services/zep_tools.py index 384cf540f..744ffc805 100644 --- a/backend/app/services/zep_tools.py +++ b/backend/app/services/zep_tools.py @@ -1362,20 +1362,18 @@ def interview_agents( ) optimized_prompt = f"{INTERVIEW_PROMPT_PREFIX}{combined_prompt}" - # Step 4: 调用真实的采访API(不指定platform,默认双平台同时采访) + # Step 4: 直接调用 SimulationRunner 的批量采访方法(不经过 HTTP,无循环依赖) try: # 构建批量采访列表(不指定platform,双平台采访) interviews_request = [] for agent_idx in selected_indices: interviews_request.append({ "agent_id": agent_idx, - "prompt": optimized_prompt # 使用优化后的prompt - # 不指定platform,API会在twitter和reddit两个平台都采访 + "prompt": optimized_prompt }) - - logger.info(f"调用批量采访API(双平台): {len(interviews_request)} 个Agent") - - # 调用 SimulationRunner 的批量采访方法(不传platform,双平台采访) + + logger.info(f"调用批量采访(双平台,direct call): {len(interviews_request)} 个Agent") + api_result = SimulationRunner.interview_agents_batch( simulation_id=simulation_id, interviews=interviews_request, diff --git a/backend/app/utils/llm_client.py b/backend/app/utils/llm_client.py index 6c1a81f49..52a21f7a0 100644 --- a/backend/app/utils/llm_client.py +++ b/backend/app/utils/llm_client.py @@ -5,12 +5,50 @@ import json import re -from typing import Optional, Dict, Any, List +from typing import Optional, Dict, Any, List, Tuple from openai import OpenAI from ..config import Config +def estimate_tokens(text: str) -> int: + """ + 粗略估算文本的 token 数量(无需依赖 tiktoken)。 + + 规则: + - 中文字符按每字 ~1.5 token 估算(UTF-8 汉字实际约 1-2 tokens) + - ASCII 单词按每词 ~1.3 token 估算 + - 额外添加 10% buffer + + Args: + text: 待估算的文本 + + Returns: + 估算的 token 数 + """ + if not text: + return 0 + chinese_chars = sum(1 for c in text if '\u4e00' <= c <= '\u9fff') + # 剩余字符按空格分词估算 + rest = ''.join(c if c < '\u4e00' or c > '\u9fff' else ' ' for c in text) + ascii_words = len(rest.split()) + estimated = int(chinese_chars * 1.5 + ascii_words * 1.3) + return int(estimated * 1.1) # 10% buffer + + +def estimate_messages_tokens(messages: List[Dict]) -> int: + """估算消息列表的总 token 数""" + total = 0 + for msg in messages: + content = msg.get("content") or "" + if isinstance(content, list): + content = " ".join( + c.get("text", "") for c in content if isinstance(c, dict) + ) + total += estimate_tokens(content) + 4 # role + overhead per message + return total + 2 # conversation overhead + + class LLMClient: """LLM客户端""" @@ -101,3 +139,56 @@ def chat_json( except json.JSONDecodeError: raise ValueError(f"LLM返回的JSON格式无效: {cleaned_response}") + def chat_with_tools( + self, + messages: List[Dict[str, Any]], + tools: List[Dict[str, Any]], + temperature: float = 0.7, + max_tokens: int = 4096, + ) -> Tuple[Optional[str], Optional[List[Dict[str, Any]]]]: + """ + 使用 OpenAI native function calling 发送请求。 + + Args: + messages: 消息列表(支持 tool role 消息) + tools: OpenAI tools 格式的工具定义列表 + temperature: 温度参数 + max_tokens: 最大 token 数 + + Returns: + (content, tool_calls) + - content: 文本内容(无工具调用时为完整回复,有工具调用时可能为 None 或思考文字) + - tool_calls: 工具调用列表,每项为 + {"id": str, "name": str, "parameters": dict} + 若没有工具调用则为 None + """ + response = self.client.chat.completions.create( + model=self.model, + messages=messages, + tools=tools, + tool_choice="auto", + temperature=temperature, + max_tokens=max_tokens, + ) + + message = response.choices[0].message + content = message.content or "" + content = re.sub(r'[\s\S]*?', '', content).strip() + + if not message.tool_calls: + return content, None + + parsed_calls = [] + for tc in message.tool_calls: + try: + params = json.loads(tc.function.arguments) + except (json.JSONDecodeError, AttributeError): + params = {} + parsed_calls.append({ + "id": tc.id, + "name": tc.function.name, + "parameters": params, + }) + + return content, parsed_calls +