-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcontent_moderator.py
More file actions
272 lines (233 loc) · 10.6 KB
/
content_moderator.py
File metadata and controls
272 lines (233 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
"""
内容审查模块 - 用于识别和过滤不当内容
包括前置审查(用户输入)和后置审查(模型输出)
"""
import re
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime
logger = logging.getLogger("conversation")
class ContentModerator:
"""内容审查器 - 识别不当内容"""
# 敏感关键词库(可根据需要扩展)
SENSITIVE_KEYWORDS = {
"政治敏感": [
"政变", "颠覆", "推翻政权", "反动", "暴政",
"独立运动", "分裂国家", "台独", "港独", "藏独", "疆独",
"六四", "天安门事件", "法轮功", "民运"
],
"暴力恐怖": [
"恐怖袭击", "爆炸物制作", "炸弹制造", "自制武器",
"杀人方法", "谋杀计划", "暗杀", "投毒", "恐怖分子",
"ISIS", "基地组织", "塔利班", "恐怖主义",
"枪支制造", "武器交易", "人体炸弹"
],
"极端暴力": [
"虐杀", "肢解", "酷刑", "折磨致死", "虐待儿童",
"性暴力", "强奸", "轮奸", "猥亵", "人口贩卖",
"自残教程", "自杀方法", "割腕", "跳楼指南"
],
"宗教极端": [
"宗教战争", "圣战", "异教徒", "宗教迫害",
"邪教", "洗脑", "精神控制", "宗教极端主义"
],
"仇恨言论": [
"种族灭绝", "种族歧视", "民族仇恨", "性别歧视",
"LGBT歧视", "残疾歧视", "仇恨犯罪", "屠杀"
],
"非法活动": [
"毒品制造", "贩毒", "洗钱方法", "诈骗教程",
"黑客攻击", "网络犯罪", "身份盗窃", "信用卡欺诈",
"色情交易", "卖淫", "人体器官买卖"
],
"违法破坏": [
"破坏公共设施", "纵火", "投毒水源", "生物武器",
"化学武器", "核武器", "黑客入侵", "DDoS攻击"
]
}
# 模糊匹配模式(正则表达式)
SENSITIVE_PATTERNS = [
(r"如何.*?(杀|害|伤害).*?人", "暴力伤害"),
(r"怎么.*?(制造|制作).*?(炸弹|爆炸物|武器)", "武器制造"),
(r"(自杀|轻生).*?(方法|教程|步骤)", "自杀相关"),
(r"如何.*?(推翻|颠覆).*?(政府|政权)", "政治敏感"),
(r"(贩毒|制毒|毒品).*?(方法|教程)", "毒品相关"),
(r"如何.*?(诈骗|欺骗|骗取)", "诈骗相关"),
(r"(儿童|未成年).*?(色情|性)", "儿童色情"),
(r"如何.*?(入侵|攻击|黑).*?(系统|网站|服务器)", "黑客攻击"),
]
def __init__(self, strict_mode: bool = True):
"""
初始化内容审查器
Args:
strict_mode: 严格模式,True则更严格地审查内容
"""
self.strict_mode = strict_mode
self.check_count = 0
self.block_count = 0
def check_content(self, content: str, content_type: str = "unknown") -> Dict[str, Any]:
"""
审查内容是否包含敏感信息
Args:
content: 要审查的内容
content_type: 内容类型(user_input / model_output)
Returns:
{
"is_safe": bool, # 是否安全
"risk_level": str, # 风险级别: safe / low / medium / high / critical
"categories": List[str], # 触发的敏感类别
"matched_keywords": List[str], # 匹配到的关键词
"reason": str, # 原因说明
"action": str # 建议操作: allow / warn / block
}
"""
self.check_count += 1
if not content or not isinstance(content, str):
return self._safe_result()
content_lower = content.lower()
matched_categories = set()
matched_keywords = []
matched_patterns = []
# 1. 关键词匹配
for category, keywords in self.SENSITIVE_KEYWORDS.items():
for keyword in keywords:
if keyword.lower() in content_lower:
matched_categories.add(category)
matched_keywords.append(keyword)
# 2. 模式匹配(正则)
for pattern, category in self.SENSITIVE_PATTERNS:
if re.search(pattern, content, re.IGNORECASE):
matched_categories.add(category)
matched_patterns.append(category)
# 3. 综合评估风险等级
if not matched_categories and not matched_patterns:
return self._safe_result()
# 根据匹配数量和类别判断风险等级
total_matches = len(matched_keywords) + len(matched_patterns)
critical_categories = {"暴力恐怖", "极端暴力", "政治敏感", "违法破坏", "儿童色情"}
high_risk_categories = {"宗教极端", "仇恨言论", "非法活动"}
has_critical = bool(matched_categories & critical_categories)
has_high_risk = bool(matched_categories & high_risk_categories)
# 确定风险级别和操作
if has_critical or total_matches >= 3:
risk_level = "critical"
action = "block"
self.block_count += 1
elif has_high_risk or total_matches >= 2:
risk_level = "high"
action = "block" if self.strict_mode else "warn"
if action == "block":
self.block_count += 1
elif total_matches >= 1:
risk_level = "medium"
action = "warn"
else:
risk_level = "low"
action = "allow"
categories_list = list(matched_categories)
reason = self._build_reason(categories_list, matched_keywords, matched_patterns)
logger.warning(
f"[content_moderation] type={content_type} risk={risk_level} action={action} "
f"categories={categories_list} matches={total_matches}"
)
return {
"is_safe": False,
"risk_level": risk_level,
"categories": categories_list,
"matched_keywords": matched_keywords[:5], # 最多显示5个
"matched_patterns": matched_patterns,
"reason": reason,
"action": action,
"timestamp": datetime.now().isoformat()
}
def _safe_result(self) -> Dict[str, Any]:
"""返回安全的审查结果"""
return {
"is_safe": True,
"risk_level": "safe",
"categories": [],
"matched_keywords": [],
"matched_patterns": [],
"reason": "内容安全",
"action": "allow",
"timestamp": datetime.now().isoformat()
}
def _build_reason(self, categories: List[str], keywords: List[str], patterns: List[str]) -> str:
"""构建审查原因说明"""
reasons = []
if categories:
reasons.append(f"检测到敏感内容类别: {', '.join(categories)}")
if keywords:
keyword_preview = keywords[:3]
reasons.append(f"匹配敏感关键词: {', '.join(keyword_preview)}")
if patterns:
reasons.append(f"匹配敏感模式: {', '.join(patterns)}")
return "; ".join(reasons)
def get_safe_response(self, risk_level: str, categories: List[str]) -> str:
"""
根据风险级别生成安全的拒绝回复
Args:
risk_level: 风险级别
categories: 触发的类别
Returns:
拒绝回复文本
"""
if risk_level == "critical":
return (
"抱歉,您的问题涉及到严重违法或极端敏感内容,我无法提供任何相关信息。\n\n"
"请注意:\n"
"- 制造武器、爆炸物、毒品等违法行为将受到法律严惩\n"
"- 暴力、恐怖主义、极端主义活动严重危害社会安全\n"
"- 请遵守法律法规,维护社会和谐稳定\n\n"
"如果您有其他合法的问题,我很乐意为您解答。"
)
elif risk_level == "high":
category_text = "、".join(categories[:2]) if categories else "敏感"
return (
f"抱歉,您的问题涉及{category_text}内容,可能违反相关法律法规或社会道德规范。\n\n"
"作为AI助手,我的职责是提供合法、安全、有益的信息。"
"我无法协助任何可能造成伤害或违法的行为。\n\n"
"如果您有其他问题,我很乐意帮助您。"
)
elif risk_level == "medium":
return (
"抱歉,您的问题包含一些敏感内容。出于安全考虑,我无法直接回答。\n\n"
"如果您的问题是出于学术研究或合法目的,请提供更多背景信息,"
"我会尽力在合法合规的范围内为您提供帮助。"
)
else:
return (
"您的问题包含一些需要谨慎对待的内容。请确保您的问题符合法律法规要求。"
)
def check_user_input(self, user_input: str) -> Dict[str, Any]:
"""前置审查:检查用户输入"""
return self.check_content(user_input, content_type="user_input")
def check_model_output(self, model_output: str) -> Dict[str, Any]:
"""后置审查:检查模型输出"""
return self.check_content(model_output, content_type="model_output")
def get_statistics(self) -> Dict[str, Any]:
"""获取审查统计信息"""
return {
"total_checks": self.check_count,
"total_blocks": self.block_count,
"block_rate": f"{(self.block_count / max(self.check_count, 1) * 100):.2f}%"
}
# 全局内容审查器实例
_global_moderator: Optional[ContentModerator] = None
def get_content_moderator(strict_mode: bool = True) -> ContentModerator:
"""获取全局内容审查器实例(单例模式)"""
global _global_moderator
if _global_moderator is None:
_global_moderator = ContentModerator(strict_mode=strict_mode)
return _global_moderator
def check_content_safety(content: str, content_type: str = "unknown") -> Dict[str, Any]:
"""
便捷函数:检查内容安全性
Args:
content: 要检查的内容
content_type: 内容类型
Returns:
审查结果字典
"""
moderator = get_content_moderator()
return moderator.check_content(content, content_type)