-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmigrate_database.py
More file actions
169 lines (134 loc) · 5.17 KB
/
migrate_database.py
File metadata and controls
169 lines (134 loc) · 5.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# migrate_database.py
"""
数据库迁移脚本:
1. 为 ChatHistory 表添加 username 字段
2. 将现有用户密码转换为加密格式
"""
import sys
from sqlalchemy import create_engine, Column, String, text
from sqlalchemy.orm import sessionmaker
from app.auth import get_password_hash
# 数据库配置(与 server.py 保持一致)
SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:123456@localhost:3306/rag_db"
engine = create_engine(SQLALCHEMY_DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def migrate_chat_history():
"""为 ChatHistory 表添加 username 字段"""
print("📊 [Migration] 正在迁移 ChatHistory 表...")
with engine.connect() as conn:
try:
# 检查字段是否已存在
result = conn.execute(text("""
SELECT COUNT(*) as count
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = 'rag_db'
AND TABLE_NAME = 'chat_history'
AND COLUMN_NAME = 'username'
"""))
exists = result.fetchone()[0] > 0
if exists:
print(" ✅ username 字段已存在,跳过迁移")
else:
# 添加 username 字段
conn.execute(text("""
ALTER TABLE chat_history
ADD COLUMN username VARCHAR(50) DEFAULT 'default' AFTER id
"""))
conn.commit()
print(" ✅ 已添加 username 字段")
# 添加索引以提升查询性能
conn.execute(text("""
ALTER TABLE chat_history
ADD INDEX idx_username (username)
"""))
conn.commit()
print(" ✅ 已添加 username 索引")
except Exception as e:
print(f" ❌ ChatHistory 迁移失败: {e}")
conn.rollback()
return False
return True
def migrate_user_passwords():
"""将现有用户的明文密码转换为加密密码"""
print("\n🔐 [Migration] 正在迁移用户密码...")
db = SessionLocal()
try:
# 导入 User 模型
from server import User
users = db.query(User).all()
if not users:
print(" ℹ️ 没有需要迁移的用户")
return True
migrated_count = 0
for user in users:
# 判断密码是否已经是 bcrypt 格式
# bcrypt 密码以 $2b$ 或 $2a$ 开头
if not user.password.startswith('$2b$') and not user.password.startswith('$2a$'):
old_password = user.password
new_password = get_password_hash(old_password)
user.password = new_password
migrated_count += 1
print(f" ✅ 已迁移用户: {user.username}")
if migrated_count > 0:
db.commit()
print(f"\n ✅ 成功迁移 {migrated_count} 个用户的密码")
else:
print(" ℹ️ 所有密码已经是加密格式")
return True
except Exception as e:
print(f" ❌ 用户密码迁移失败: {e}")
db.rollback()
return False
finally:
db.close()
def verify_migration():
"""验证迁移结果"""
print("\n🔍 [Verification] 验证迁移结果...")
db = SessionLocal()
try:
from server import User, ChatHistory
# 验证用户表
user_count = db.query(User).count()
print(f" ✅ 用户表: {user_count} 条记录")
# 验证聊天历史表
history_count = db.query(ChatHistory).count()
print(f" ✅ 聊天历史表: {history_count} 条记录")
# 检查字段
with engine.connect() as conn:
result = conn.execute(text("""
SELECT COLUMN_NAME
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = 'rag_db'
AND TABLE_NAME = 'chat_history'
"""))
columns = [row[0] for row in result.fetchall()]
print(f" ✅ ChatHistory 字段: {', '.join(columns)}")
print("\n✅ [Success] 数据库迁移完成!")
return True
except Exception as e:
print(f" ❌ 验证失败: {e}")
return False
finally:
db.close()
def main():
"""主函数"""
print("=" * 60)
print("🚀 开始数据库迁移")
print("=" * 60)
# 步骤1:迁移 ChatHistory 表
if not migrate_chat_history():
print("\n❌ 迁移失败,已回滚")
sys.exit(1)
# 步骤2:迁移用户密码
if not migrate_user_passwords():
print("\n❌ 迁移失败,已回滚")
sys.exit(1)
# 步骤3:验证迁移
if not verify_migration():
print("\n⚠️ 迁移完成但验证时出现问题")
sys.exit(1)
print("\n" + "=" * 60)
print("🎉 所有迁移任务已成功完成!")
print("=" * 60)
if __name__ == "__main__":
main()