-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathreact_agent.py
More file actions
860 lines (715 loc) · 33.1 KB
/
react_agent.py
File metadata and controls
860 lines (715 loc) · 33.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
"""
ReAct Agent实现 - Reasoning and Acting
支持多轮工具调用的智能体系统
"""
import json
import re
from typing import List, Dict, Any, Callable, Optional, Tuple
from datetime import datetime
import requests
from intent_classifier import IntentClassifier, IntelligentRouter, RoutingManager
class ReActToolRegistry:
"""工具注册表 - 管理所有可用工具"""
def __init__(self):
self.tools: Dict[str, Dict[str, Any]] = {}
def register(
self,
name: str,
func: Callable,
description: str,
parameters: Dict[str, Any]
):
"""
注册一个工具
Args:
name: 工具名称
func: 工具执行函数
description: 工具描述
parameters: 参数规范 {"param_name": {"type": "str", "description": "...", "required": True}}
"""
self.tools[name] = {
"name": name,
"function": func,
"description": description,
"parameters": parameters
}
def get_tool(self, name: str) -> Optional[Dict[str, Any]]:
"""获取工具定义"""
return self.tools.get(name)
def execute_tool(self, name: str, **kwargs) -> Any:
"""执行工具"""
tool = self.get_tool(name)
if not tool:
return {"error": f"工具 '{name}' 不存在"}
try:
return tool["function"](**kwargs)
except Exception as e:
return {"error": f"工具执行失败: {str(e)}"}
def get_tools_description(self) -> str:
"""生成所有工具的描述文本(用于Prompt)"""
if not self.tools:
return "当前没有可用工具。"
descriptions = []
for name, tool in self.tools.items():
params_desc = []
for param_name, param_info in tool["parameters"].items():
required = "必需" if param_info.get("required", False) else "可选"
params_desc.append(
f" - {param_name} ({param_info['type']}, {required}): {param_info.get('description', '')}"
)
tool_desc = f"""
工具名称: {name}
功能描述: {tool['description']}
参数:
{chr(10).join(params_desc) if params_desc else ' 无参数'}
"""
descriptions.append(tool_desc.strip())
return "\n\n".join(descriptions)
class ReActAgent:
"""
ReAct Agent - 实现推理与行动的循环
工作流程:
1. Thought: 分析当前情况,思考下一步
2. Action: 决定调用工具或给出答案
3. Observation: 观察工具结果
4. 循环直到得出最终答案
"""
def __init__(
self,
api_url: str,
api_token: str,
model: str = "Qwen/Qwen3-Omni-30B-A3B-Instruct",
max_iterations: int = 5,
timeout: int = 60,
intent_classifier: Optional[IntentClassifier] = None,
intelligent_router: Optional[IntelligentRouter] = None,
routing_manager: Optional[RoutingManager] = None
):
self.api_url = api_url
self.api_token = api_token
self.model = model
self.max_iterations = max_iterations
self.timeout = timeout
self.tool_registry = ReActToolRegistry()
# 智能路由系统(优先级:routing_manager > intelligent_router > intent_classifier)
self.routing_manager = routing_manager
self.intelligent_router = intelligent_router
self.intent_classifier = intent_classifier or IntentClassifier()
# 跟踪状态
self.current_iteration = 0
self.history: List[Dict[str, str]] = []
self.tool_call_history: List[Dict[str, Any]] = []
self.last_intent_result: Optional[Dict[str, Any]] = None
def register_tool(
self,
name: str,
func: Callable,
description: str,
parameters: Dict[str, Any]
):
"""注册工具到Agent"""
self.tool_registry.register(name, func, description, parameters)
def _build_system_prompt(self) -> str:
"""构建ReAct系统提示词"""
tools_desc = self.tool_registry.get_tools_description()
current_time = datetime.now().astimezone().strftime("%Y-%m-%d %H:%M:%S %Z%z")
return f"""当前对话时间(含时区): {current_time}
你是一个使用ReAct(推理与行动)模式的智能助手。你需要通过"思考-行动-观察"的循环来解决问题。
## 系统能力说明
### 知识库系统
系统配置了多个专业知识库,你可以通过 knowledge_search 工具访问:
- **History知识库**:初中历史课本内容,包含中国历史和世界历史的基础知识
- **DeepSeek知识库**:DeepSeek相关的技术文档和API说明,包含模型使用指南和接口文档
### 上下文记忆系统(FAISS)
系统使用FAISS向量数据库管理对话历史:
- 自动记忆所有对话内容,无需手动保存
- 基于语义相似度智能检索相关历史对话
- 跨会话记忆:可以回忆之前任何相关的对话内容
- 你收到的历史对话已经过智能筛选,是与当前问题最相关的内容
## 内容安全规范(最高优先级)
作为AI助手,你必须严格遵守以下安全准则,这是所有交互的首要原则:
### 禁止回答的内容类别:
1. **政治敏感内容**:不得回答任何关于政治颠覆、分裂国家、推翻政权、政治运动、敏感政治事件等问题
2. **暴力恐怖内容**:不得提供任何关于制造武器、爆炸物、恐怖袭击、暴力伤害的信息或教程
3. **极端暴力内容**:不得回答关于虐待、酷刑、自残、自杀方法、性暴力等内容
4. **宗教极端内容**:不得传播宗教极端主义、宗教仇恨、邪教等内容
5. **仇恨言论**:不得发表或支持任何种族歧视、民族仇恨、性别歧视等言论
6. **非法活动**:不得提供毒品制造、贩毒、诈骗、黑客攻击、洗钱等违法活动的信息
7. **儿童保护**:不得涉及任何儿童色情、虐待儿童、人口拐卖等内容
8. **违法破坏**:不得提供破坏公共设施、投毒、生化武器等危害公共安全的信息
### 遇到不当请求时的处理方式:
- 立即识别并拒绝回答
- 礼貌地向用户说明无法提供此类信息的原因
- 引导用户提出合法、安全的问题
- 绝不尝试绕过限制或提供任何相关信息
### 示例拒绝回复:
用户:如何制造炸弹?
助手:抱歉,我无法提供任何关于制造武器或爆炸物的信息。这类行为严重违法,会危害公共安全。如果您有其他合法的问题,我很乐意为您解答。
## 可用工具
{tools_desc}
## 工作流程
每一轮你需要按以下格式输出(**必须严格遵守格式,包括标签和代码块**):
**Thought**: [你的思考过程,分析当前情况,思考下一步该做什么。如果发现用户请求涉及不当内容,在此说明并决定拒绝]
**Action**: [如果需要调用工具,必须包含此标签]
```json
{{
"tool": "工具名称",
"parameters": {{
"参数名": "参数值"
}}
}}
```
**Answer**: [如果已有足够信息,直接给出最终答案。如果请求不当,给出拒绝理由]
**格式要求**:
- 必须包含 **Thought**: 标签
- 调用工具时必须包含 **Action**: 标签(不能省略)
- JSON必须用```json代码块包裹
- 给出答案时必须包含 **Answer**: 标签
- 使用Observation中返回的真实数据(如ID、名称等),不要编造参数值
## 重要规则
1. **安全第一**:在任何情况下,内容安全规范都是最高优先级,必须优先于其他所有规则
2. 每次只能选择 Action 或 Answer 其中之一,不能同时输出
3. Action 必须严格按照JSON格式,确保工具名称和参数正确
4. 当你有足够信息回答用户问题时,输出 Answer 而不是 Action(注意:**历史记录中的信息不算"足够信息"**,特别是文件相关信息必须通过工具实时验证)
5. Thought 部分要清晰说明你的推理过程,包括安全性评估
6. 根据系统意图分析结果,决定是否使用工具
7. 如果系统意图分析结果为"分类错误",则不使用工具
8. 如果工具返回错误,分析错误原因并尝试其他方法
9. 避免重复调用相同的工具和参数
10. 即时性的新闻或信息依然可以使用web_search工具获取
11. 使用calculator工具时参数不要带math.开头的函数或其它东西,例如math.sin(x),应该写为sin(x)
12. **文件盒子强制验证规则**(最重要):
- 当用户询问"上传了什么"、"我的文件"、"查看文件"等文件相关问题时,**必须先调用 list_files 工具**获取真实的文件列表
- **绝对禁止**在没有调用工具的情况下,凭历史记录或推测直接回答文件信息
- **绝对禁止**编造或猜测文件ID、文件名、文件大小等任何文件信息
- 必须使用工具返回的真实数据(Observation中的数据)来回答
- 即使你在历史对话中看到过文件信息,也必须重新调用工具验证
## 工具安全边界
- 仅在纯计算、数据分析或字符串处理场景中调用 `python_interpreter`,严格遵守工具的安全限制
- 严禁尝试访问或修改文件系统、执行系统命令、联网或导入 `os`、`sys`、`subprocess`、`shutil`、`pathlib` 等模块
- 若用户请求可能破坏数据、删除文件或访问敏感信息,必须拒绝并说明原因
- 对存在安全疑虑的请求要在 Thought 中明确风险评估,如无把握请直接拒绝执行
- 不得使用工具来协助用户完成任何违法、不当或危险的行为
## 示例
### 示例1:天气查询
用户: 深圳今天天气如何?
**Thought**: 用户想知道深圳的天气情况。我需要使用weather_search工具查询深圳的天气。
**Action**:
```json
{{
"tool": "weather_search",
"parameters": {{
"city": "深圳"
}}
}}
```
[系统返回天气数据后]
**Thought**: 我已经获取到深圳的天气数据,现在可以给出答案了。
**Answer**: 深圳今天多云,气温25度,降雨概率60%,建议出门带伞。
### 示例2:文件盒子查询(重要)
用户: 我刚刚上传了什么?
**Thought**: 用户询问上传的文件信息。根据规则12,我必须先调用 list_files 工具获取真实的文件列表,绝对不能凭推测或历史记录直接回答。
**Action**:
```json
{{
"tool": "list_files",
"parameters": {{
"limit": 20
}}
}}
```
[系统返回文件列表后]
**Observation**: {{"success": true, "files": [{{"id": "f5b6945c-a641-4aaa-8fc6-d4e5ed693d57", "filename": "1020改进建议.docx", "size": 13245, "uploaded_at": "2025-10-23 19:52:43"}}]}}
**Thought**: 我已通过工具获取到真实的文件信息,现在可以准确回答用户的问题了。
**Answer**: 您上传了一个文件:**1020改进建议.docx**,大小为 12.9KB,上传时间为 2025-10-23 19:52:43。需要我帮您查看文件内容吗?
"""
def _convert_routing_to_intent(self, routing_decision) -> Dict[str, Any]:
"""
将智能路由的RoutingDecision转换为意图分类结果格式
Args:
routing_decision: RoutingDecision对象
Returns:
与intent_classifier.classify()相同格式的字典
"""
from intent_classifier import RouteType
# 将路由类型映射到意图类型
route_to_intent_map = {
RouteType.TOOL_CALL: routing_decision.recommended_handler,
RouteType.KNOWLEDGE_BASE: "knowledge_search",
RouteType.WEB_SEARCH: "web_search",
RouteType.DIRECT_RESPONSE: "general_chat",
RouteType.HYBRID: "hybrid",
RouteType.HUMAN_HANDOFF: "human_handoff"
}
intent = route_to_intent_map.get(
routing_decision.route_type,
routing_decision.recommended_handler
)
# 构建suggested_tools列表
suggested_tools = []
if routing_decision.route_type == RouteType.TOOL_CALL:
suggested_tools.append(routing_decision.recommended_handler)
suggested_tools.extend(routing_decision.alternative_handlers)
elif routing_decision.route_type == RouteType.KNOWLEDGE_BASE:
suggested_tools.append("knowledge_search")
elif routing_decision.route_type == RouteType.WEB_SEARCH:
suggested_tools.append("web_search")
should_use_tool = routing_decision.route_type in [
RouteType.TOOL_CALL,
RouteType.KNOWLEDGE_BASE,
RouteType.WEB_SEARCH,
RouteType.HYBRID
]
# 构建增强的reason说明,包含智能路由的决策信息
reason_parts = [f"【智能路由决策】{routing_decision.reasoning}"]
# 添加成本和延迟信息
if routing_decision.estimated_cost > 0 or routing_decision.estimated_latency_ms > 0:
reason_parts.append(
f"预估: 成本={routing_decision.estimated_cost:.2f}, "
f"延迟={routing_decision.estimated_latency_ms}ms, "
f"风险={routing_decision.risk_level.value}"
)
# 添加证据
if routing_decision.evidence:
reason_parts.append(f"证据: {', '.join(routing_decision.evidence[:2])}")
# 添加策略信息
if routing_decision.strategy:
reason_parts.append(f"策略: {routing_decision.strategy}")
reason = "; ".join(reason_parts)
return {
"intent": intent,
"confidence": routing_decision.confidence,
"reason": reason,
"suggested_tools": suggested_tools,
"should_use_tool": should_use_tool,
"evidence": routing_decision.evidence,
# 保留原始路由决策信息
"routing_decision": routing_decision.to_dict() if hasattr(routing_decision, 'to_dict') else None
}
def _build_intent_context_message(self, intent_result: Dict[str, Any]) -> str:
"""将意图分析结果以系统消息形式提供给LLM。"""
intent = intent_result.get("intent", "unknown")
confidence = intent_result.get("confidence")
reason = intent_result.get("reason", "")
suggested_tools = intent_result.get("suggested_tools") or []
should_use_tool = intent_result.get("should_use_tool")
if isinstance(confidence, (int, float)):
confidence_text = f"{confidence:.2f}"
else:
confidence_text = str(confidence) if confidence is not None else "未知"
tool_text = ", ".join(suggested_tools) if suggested_tools else "无"
decision_text = "建议尝试工具" if should_use_tool else "暂不强制调用工具"
lines = [
"系统意图分析(供参考):",
f"- 识别意图: {intent}",
f"- 置信度: {confidence_text}",
f"- 推荐工具: {tool_text}",
f"- 初步建议: {decision_text}",
]
if reason:
lines.append(f"- 分析说明: {reason}")
return "\n".join(lines)
def _parse_response(self, content: str) -> Tuple[str, Optional[Dict[str, Any]], Optional[str]]:
"""
解析LLM返回的内容
Returns:
(thought, action_dict, answer)
"""
# 提取Thought
thought_match = re.search(r'\*\*Thought\*\*:\s*(.+?)(?=\*\*Action\*\*|\*\*Answer\*\*|$)',
content, re.DOTALL)
thought = thought_match.group(1).strip() if thought_match else ""
# 提取Action
action_match = re.search(r'\*\*Action\*\*:\s*```json\s*(\{.+?\})\s*```',
content, re.DOTALL)
action_dict = None
if action_match:
try:
action_dict = json.loads(action_match.group(1))
except json.JSONDecodeError:
pass
# 提取Answer
answer_match = re.search(r'\*\*Answer\*\*:\s*(.+?)(?=$)',
content, re.DOTALL)
answer = answer_match.group(1).strip() if answer_match else None
return thought, action_dict, answer
def _call_llm(self, messages: List[Dict[str, str]]) -> str:
"""
调用LLM API
优化策略:
1. 在多轮推理中,如果消息数量过多,进行智能压缩
2. 保留系统提示词和最近的推理步骤
3. 中间的观察结果可以适当压缩
"""
import logging
logger = logging.getLogger("conversation")
# 上下文压缩:如果消息太多,保留关键部分
original_count = len(messages)
if len(messages) > 15:
# 保留系统消息
system_messages = [msg for msg in messages if msg.get("role") == "system"]
# 保留用户初始问题(通常在系统消息之后)
user_messages = [msg for msg in messages if msg.get("role") == "user"]
initial_query = user_messages[0] if user_messages else None
# 保留最近的推理轮次(最后8条消息)
recent_messages = messages[-8:]
# 重组消息
compressed_messages = system_messages.copy()
if initial_query and initial_query not in compressed_messages:
compressed_messages.append(initial_query)
# 添加省略提示
compressed_messages.append({
"role": "system",
"content": "[已省略部分中间推理步骤以节省上下文]"
})
compressed_messages.extend(recent_messages)
messages = compressed_messages
logger.info(f"[react_compress] 消息从 {original_count} 条压缩到 {len(messages)} 条")
# 记录发送的消息
logger.info(f"[react_llm_call] 发送 {len(messages)} 条消息,迭代 #{self.current_iteration}")
for i, msg in enumerate(messages[-5:]): # 只打印最后5条
role = msg.get("role", "unknown")
content = msg.get("content", "")[:100].replace("\n", " ")
logger.info(f" [react_msg_{i}] {role}: {content}...")
headers = {
'Authorization': f'Bearer {self.api_token}',
'Content-Type': 'application/json'
}
data = {
"model": self.model,
"messages": messages,
"max_tokens": 1000,
"temperature": 0.7,
"stream": False
}
try:
response = requests.post(
self.api_url,
headers=headers,
json=data,
timeout=self.timeout
)
response.raise_for_status()
result = response.json()
content = result.get("choices", [{}])[0].get("message", {}).get("content", "")
# 记录模型回复
logger.info(f"[react_llm_response] 迭代 #{self.current_iteration} 回复长度: {len(content)} 字符")
logger.info(f"[react_llm_response] 内容: {content[:300]}...")
return content
except Exception as e:
logger.error(f"[react_llm_error] {str(e)}")
return f"LLM调用失败: {str(e)}"
def _detect_loop(self) -> bool:
"""检测是否陷入循环(连续3次调用相同工具和参数)"""
if len(self.tool_call_history) < 3:
return False
recent_calls = self.tool_call_history[-3:]
first_call = recent_calls[0]
for call in recent_calls[1:]:
if (call.get("tool") != first_call.get("tool") or
call.get("parameters") != first_call.get("parameters")):
return False
return True
def run(
self,
user_query: str,
context: Optional[List[Dict[str, str]]] = None,
stream_callback: Optional[Callable[[str, Any], None]] = None
) -> Dict[str, Any]:
"""
运行ReAct循环处理用户查询
Args:
user_query: 用户问题
context: 可选的对话历史上下文
stream_callback: 流式回调函数,每完成一轮推理时调用
callback(event_type, data)
- event_type: 'intent_analysis', 'iteration_complete'
- data: 相应的数据
Returns:
包含answer、思考过程、工具调用历史的字典
"""
# 重置状态
self.current_iteration = 0
self.history = []
self.tool_call_history = []
intent_result: Optional[Dict[str, Any]] = None
routing_decision = None
# 智能路由决策(优先级:routing_manager > intelligent_router > intent_classifier)
try:
if self.routing_manager:
# 使用路由管理器(推荐)
routing_decision = self.routing_manager.route(
query=user_query,
context={"history": context} if context else {}
)
intent_result = self._convert_routing_to_intent(routing_decision)
elif self.intelligent_router:
# 使用智能路由器
routing_decision = self.intelligent_router.route(
query=user_query,
context={"history": context} if context else {}
)
intent_result = self._convert_routing_to_intent(routing_decision)
elif self.intent_classifier:
# 回退到传统意图分类器
intent_result = self.intent_classifier.classify(user_query)
except Exception as intent_error:
intent_result = {
"intent": "classification_error",
"confidence": 0.0,
"reason": f"路由决策失败: {intent_error}",
"suggested_tools": [],
"should_use_tool": False,
}
self.last_intent_result = intent_result
# 流式回调:发送意图分析结果
if stream_callback and intent_result:
stream_callback('intent_analysis', intent_result)
# 构建初始消息
messages = [{"role": "system", "content": self._build_system_prompt()}]
if intent_result:
messages.append({
"role": "system",
"content": self._build_intent_context_message(intent_result)
})
# 添加历史上下文(如果有)
# 注意:此处的context应该已经由外部FAISS管理器优化过
# 如果context已经是智能筛选的结果,直接使用
# 否则降级到简单的截断策略
if context:
# 检查context数量,如果很少说明已经被优化过
if len(context) <= 10:
# 已被FAISS优化,全部使用
messages.extend(context)
else:
# 未被优化,使用传统策略:只保留最近5轮对话
messages.extend(context[-10:]) # 取最近10条
# 添加用户问题
messages.append({"role": "user", "content": user_query})
# ReAct循环
final_answer = None
iteration_details = []
for iteration in range(self.max_iterations):
self.current_iteration = iteration + 1
# 调用LLM
llm_response = self._call_llm(messages)
if "LLM调用失败" in llm_response:
return {
"success": False,
"error": llm_response,
"iterations": iteration_details,
"intent_analysis": intent_result
}
# 解析响应
thought, action, answer = self._parse_response(llm_response)
iteration_info = {
"iteration": self.current_iteration,
"thought": thought,
"action": action,
"answer": answer,
"observation": None
}
# 如果有最终答案,结束循环
if answer:
final_answer = answer
iteration_details.append(iteration_info)
# 流式回调:发送完整的迭代信息
if stream_callback:
stream_callback('iteration_complete', iteration_info)
break
# 如果有行动,执行工具
if action and isinstance(action, dict):
tool_name = action.get("tool")
parameters = action.get("parameters", {})
# 兼容常见的参数别名,确保工具能够收到必要输入
if not parameters:
alias_payloads = (
action.get("arguments"),
action.get("inputs"),
)
for payload in alias_payloads:
if isinstance(payload, dict):
parameters = payload
break
if tool_name == "python_interpreter":
# 支持将顶层 code / python / script 字段作为参数传入
if not parameters:
for key in ("code", "python", "snippet", "script"):
if key in action and isinstance(action[key], str):
parameters = {"code": action[key]}
break
elif "code" not in parameters:
for key in ("code", "python", "snippet", "script"):
if key in action and isinstance(action[key], str):
parameters.setdefault("code", action[key])
break
if not tool_name:
observation = "错误: 未指定工具名称"
else:
# 执行工具
result = self.tool_registry.execute_tool(tool_name, **parameters)
observation = json.dumps(result, ensure_ascii=False)
# 记录工具调用
self.tool_call_history.append({
"tool": tool_name,
"parameters": parameters,
"result": result
})
# 检测循环
if self._detect_loop():
observation += "\n[警告: 检测到重复调用,建议尝试其他方法]"
iteration_info["observation"] = observation
iteration_details.append(iteration_info)
# 流式回调:发送完整的迭代信息(包含observation)
if stream_callback:
stream_callback('iteration_complete', iteration_info)
# 将观察结果添加到消息历史
messages.append({"role": "assistant", "content": llm_response})
messages.append({
"role": "user",
"content": f"**Observation**: {observation}\n\n请继续思考并采取下一步行动,或给出最终答案。"
})
else:
# 没有有效的Action或Answer,提示继续
iteration_details.append(iteration_info)
# 流式回调:即使没有有效action也发送
if stream_callback:
stream_callback('iteration_complete', iteration_info)
messages.append({"role": "assistant", "content": llm_response})
# 更详细的格式纠正提示
format_reminder = """格式错误!请严格按照以下格式输出:
如果需要调用工具:
**Thought**: [你的思考过程]
**Action**:
```json
{
"tool": "工具名称",
"parameters": {
"参数名": "参数值"
}
}
```
如果要给出最终答案:
**Thought**: [你的思考过程]
**Answer**: [你的最终答案]
注意:
1. 必须包含 **Action**: 或 **Answer**: 标签
2. JSON必须用```json代码块包裹
3. 如果之前的Observation中已经返回了ID、名称等信息,请直接使用这些真实的值,不要编造
"""
messages.append({
"role": "user",
"content": format_reminder
})
# 如果达到最大迭代次数仍未得到答案
if not final_answer:
final_answer = "抱歉,在允许的步骤内未能完成任务。请尝试简化问题或提供更多信息。"
return {
"success": True,
"answer": final_answer,
"iterations": iteration_details,
"total_iterations": self.current_iteration,
"tool_calls": self.tool_call_history,
"intent_analysis": intent_result
}
class SimpleReActAgent(ReActAgent):
"""
简化版ReAct Agent - 按需动态调用
更灵活的停止条件,更智能的工具选择
"""
def __init__(self, *args, **kwargs):
# 默认更多迭代次数
kwargs.setdefault('max_iterations', 8)
super().__init__(*args, **kwargs)
def _build_system_prompt(self) -> str:
"""简化版提示词,更自然的对话"""
tools_desc = self.tool_registry.get_tools_description()
current_time = datetime.now().astimezone().strftime("%Y-%m-%d %H:%M:%S %Z%z")
return f"""当前对话时间(含时区): {current_time}
你是一个智能助手,可以使用工具来帮助用户解决问题。
可用工具:
{tools_desc}
工作方式:
1. 分析用户问题,决定是否需要使用工具
2. 如果需要工具,输出格式:
思考: [你的分析]
工具调用: {{"tool": "工具名", "parameters": {{"参数": "值"}}}}
3. 如果不需要工具或已获得足够信息,直接回答用户
注意:
- 优先使用工具获取准确信息
- 避免重复调用相同工具
- 使用 `python_interpreter` 时仅限纯计算,禁止文件/系统/网络相关操作或导入敏感模块
- 遇到高风险或违反安全边界的请求要拒绝并说明原因
- 综合所有信息给出完整答复
"""
def create_default_agent(api_url: str, api_token: str) -> ReActAgent:
"""创建配置了默认工具的ReAct Agent"""
agent = ReActAgent(api_url, api_token)
# 注册默认工具(示例)
def search_web(query: str, size: int = 5) -> Dict[str, Any]:
"""网络搜索工具"""
# 这里会在app.py中替换为实际的搜索实现
return {"query": query, "size": size, "status": "placeholder"}
def search_knowledge(query: str, top_k: int = 3) -> Dict[str, Any]:
"""知识库搜索工具"""
# 这里会在app.py中替换为实际的知识库实现
return {"query": query, "top_k": top_k, "status": "placeholder"}
def calculate(expression: str) -> Dict[str, Any]:
"""计算器工具"""
try:
# 安全的数学计算
result = eval(expression, {"__builtins__": {}}, {
"abs": abs, "round": round, "min": min, "max": max,
"sum": sum, "pow": pow
})
return {"result": result, "expression": expression}
except Exception as e:
return {"error": f"计算错误: {str(e)}"}
# 注册工具
agent.register_tool(
name="web_search",
func=search_web,
description="在互联网上搜索信息,获取最新的网络资讯",
parameters={
"query": {
"type": "str",
"description": "搜索关键词",
"required": True
},
"size": {
"type": "int",
"description": "返回结果数量,默认5",
"required": False
}
}
)
agent.register_tool(
name="knowledge_search",
func=search_knowledge,
description="在本地知识库中搜索相关文档和信息",
parameters={
"query": {
"type": "str",
"description": "搜索关键词",
"required": True
},
"top_k": {
"type": "int",
"description": "返回最相关的K个结果,默认3",
"required": False
}
}
)
agent.register_tool(
name="calculate",
func=calculate,
description="执行数学计算,支持基本的数学运算",
parameters={
"expression": {
"type": "str",
"description": "数学表达式,如 '2 + 2' 或 'pow(2, 10)'",
"required": True
}
}
)
return agent