Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@ Thumbs.db

# LLM
.claude/

# Logs
log/
112 changes: 56 additions & 56 deletions automation/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
from typing import Optional
from playwright.async_api import Page, BrowserContext
from urllib.parse import urlparse
from .exception_context import exception_context
from logger import get_logger

logger = get_logger("automation.auth")


class AuthManager:
Expand All @@ -23,6 +27,7 @@ def __init__(self, page: Page, context: BrowserContext):
self.page = page
self.context = context

@exception_context("加载Cookie")
async def load_cookies(self, cookie_file: str = "cookies.json") -> bool:
"""
从文件加载Cookie到浏览器
Expand All @@ -31,19 +36,16 @@ async def load_cookies(self, cookie_file: str = "cookies.json") -> bool:
"""
cookie_path = Path(cookie_file)
if not cookie_path.exists():
print(f"⚠ Cookie文件不存在: {cookie_file}")
logger.warning(f"⚠ Cookie文件不存在: {cookie_file}")
return False

try:
with open(cookie_file, 'r', encoding='utf-8') as f:
cookies = json.load(f)
await self.context.add_cookies(cookies)
print(f"✓ Cookie已从文件加载: {cookie_file}")
return True
except Exception as e:
print(f"⚠ 加载Cookie失败: {e}")
return False
with open(cookie_file, 'r', encoding='utf-8') as f:
cookies = json.load(f)
await self.context.add_cookies(cookies)
logger.info(f"✓ Cookie已从文件加载: {cookie_file}")
return True

@exception_context("保存Cookie")
async def save_cookies(self, cookie_file: str = "cookies.json"):
"""
保存当前浏览器的Cookie到文件
Expand All @@ -52,8 +54,9 @@ async def save_cookies(self, cookie_file: str = "cookies.json"):
cookies = await self.context.cookies()
with open(cookie_file, 'w', encoding='utf-8') as f:
json.dump(cookies, f, indent=2, ensure_ascii=False)
print(f"✓ Cookie已保存到: {cookie_file}")
logger.info(f"✓ Cookie已保存到: {cookie_file}")

@exception_context("刷新Cookie")
async def refresh_cookies(self, cookie_file: str = "cookies.json"):
"""
刷新并保存当前浏览器的Cookie到文件
Expand All @@ -63,44 +66,43 @@ async def refresh_cookies(self, cookie_file: str = "cookies.json"):

# 检查按钮是否存在
if await refresh_button.count() > 0:
print("✓ 检测到延长会话按钮,正在点击以刷新Cookie...")
logger.info("✓ 检测到延长会话按钮,正在点击以刷新Cookie...")
await refresh_button.click()
await asyncio.sleep(1) # 等待cookie更新
await self.save_cookies(cookie_file)
await self.load_cookies(cookie_file)

@exception_context("检查Cookie有效性")
async def check_cookie_validity(self) -> bool:
"""
检查Cookie是否有效
通过检查页面内容是否包含"访客不能访问此课程"来判断
:return: True表示Cookie有效,False表示Cookie已失效
"""
try:
page_content = await self.page.content()
if "访客不能访问此课程" in page_content:
print("❌ 检测到Cookie已失效")
return False
return True
except Exception as e:
print(f"⚠ Cookie有效性检测出错: {e}")
return True # 检测失败时默认认为有效,避免误判
page_content = await self.page.content()
if "访客不能访问此课程" in page_content:
logger.error("❌ 检测到Cookie已失效")
return False
return True

@exception_context("使用Cookie登录")
async def login_with_cookies(self, base_url: str, cookie_file: str = "cookies.json") -> bool:
"""
使用Cookie登录
:param base_url: 网站首页或任意需要登录的页面URL
:param cookie_file: Cookie文件路径
:return: 是否登录成功
"""
print("正在使用Cookie登录...")
logger.info("正在使用Cookie登录...")

# 加载Cookie
if not await self.load_cookies(cookie_file):
print("\n❌ Cookie加载失败!")
logger.error("\n❌ Cookie加载失败!")
return False
# 检查登录状态
return await self.check_login_status(base_url)

@exception_context("检查登录状态")
async def check_login_status(self, base_url: str) -> bool:
"""
检查登录状态是否有效
Expand All @@ -117,19 +119,20 @@ async def check_login_status(self, base_url: str) -> bool:
# 判断是否重定向到了不同的页面
current_parsed = urlparse(current_url)
base_parsed = urlparse(base_url)

# Compare scheme, netloc, and path (ignoring query params and fragments)
if (current_parsed.scheme != base_parsed.scheme or
if (current_parsed.scheme != base_parsed.scheme or
current_parsed.netloc != base_parsed.netloc or
current_parsed.path.rstrip('/') != base_parsed.path.rstrip('/')):
print(f"❌ Cookie登录失败! 页面被重定向到: {current_url}")
print("💡 Cookie可能已过期,请重新获取Cookie")
logger.error(f"❌ Cookie登录失败! 页面被重定向到: {current_url}")
logger.info("💡 Cookie可能已过期,请重新获取Cookie")
return False

print(f"✓ Cookie登录成功,当前页面: {self.page.url}")
logger.info(f"✓ Cookie登录成功,当前页面: {self.page.url}")
return True

async def interactive_login_and_save_cookies(self,
@exception_context("交互式登录并保存Cookie")
async def interactive_login_and_save_cookies(self,
login_url: str,
base_url: str,
sso_index_url: str,
Expand All @@ -141,47 +144,44 @@ async def interactive_login_and_save_cookies(self,
:param cookie_file: Cookie文件路径
:return: 是否成功登录并保存Cookie
"""
print("🌐 正在打开登录页面...")
logger.info("🌐 正在打开登录页面...")
await self.page.goto(login_url, wait_until='networkidle')
await self.page.set_viewport_size({"width": 800, "height": 600})
print(f"✅ 登录页面已打开: {login_url}")
print("📝 请在浏览器中完成登录操作")
logger.info(f"✅ 登录页面已打开: {login_url}")
logger.info("📝 请在浏览器中完成登录操作")
await asyncio.get_running_loop().run_in_executor(None, input, "🔑 登录完成后,请按回车键继续...")
# 先前往 SSO 主页
await self.page.goto(sso_index_url)
print("🔍 尝试获取cookie...")
try:
# 查找文本为"砺儒云课堂"的a标签
li_ru_link = self.page.get_by_text("砺儒云课堂")
if await li_ru_link.count() > 0:
# 使用 context.expect_popup() 来捕捉点击后产生的新页面
async with self.page.expect_popup() as popup_info:
await li_ru_link.first.click()

# 这里的 moodle_page 就是新打开的那个标签页
moodle_page = await popup_info.value

# 等待新页面加载完成
await moodle_page.wait_for_load_state()
print("✅ 成功跳转到目标页面")
else:
print("⚠️ 未找到'砺儒云课堂'链接")
except Exception as e:
print(f"⚠️ 点击'砺儒云课堂'链接时出错: {e}")
logger.info("🔍 尝试获取cookie...")
# 查找文本为"砺儒云课堂"的a标签
li_ru_link = self.page.get_by_text("砺儒云课堂")
if await li_ru_link.count() > 0:
# 使用 context.expect_popup() 来捕捉点击后产生的新页面
async with self.page.expect_popup() as popup_info:
await li_ru_link.first.click()

# 这里的 moodle_page 就是新打开的那个标签页
moodle_page = await popup_info.value

# 等待新页面加载完成
await moodle_page.wait_for_load_state()
logger.info("✅ 成功跳转到目标页面")
else:
logger.warning("⚠️ 未找到'砺儒云课堂'链接")
# 验证Cookie是否有效
print("🔍 验证登录状态...")
logger.info("🔍 验证登录状态...")
if await self.check_login_status(base_url):
print("✅ 登录验证成功!")
logger.info("✅ 登录验证成功!")
else:
while not await self.check_login_status(base_url):
print("❌ 登录验证失败!")
logger.error("❌ 登录验证失败!")
loop = asyncio.get_running_loop()
retry = await loop.run_in_executor(None, input, "是否重试?(y/n): ")
if retry.strip().lower() not in ('y', 'yes'):
return False
print("✅ 登录验证成功!")
logger.info("✅ 登录验证成功!")

# 保存当前浏览器的Cookie
await self.save_cookies(cookie_file)
print(f"✅ Cookie已保存到: {cookie_file}")
logger.info(f"✅ Cookie已保存到: {cookie_file}")
return True
7 changes: 5 additions & 2 deletions automation/browser.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
"""

from playwright.async_api import async_playwright, Browser, BrowserContext, Page
from logger import get_logger

logger = get_logger("automation.browser")


class BrowserManager:
Expand Down Expand Up @@ -38,13 +41,13 @@ async def setup(self):
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
)
self.page = await self.context.new_page()
print("✓ 浏览器启动成功 (已静音)")
logger.info("✓ 浏览器启动成功 (已静音)")

async def close(self):
"""关闭浏览器"""
if self.browser:
await self.browser.close()
print("\n✓ 浏览器已关闭")
logger.info("\n✓ 浏览器已关闭")

def get_page(self) -> Page:
"""获取当前页面对象"""
Expand Down
68 changes: 68 additions & 0 deletions automation/exception_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from functools import wraps
import asyncio


class BrowserClosedError(Exception):
"""用户手动关闭浏览器时抛出的异常"""

pass


def _is_browser_closed_error(e: BaseException) -> bool:
"""检查是否为浏览器关闭相关的错误"""
if isinstance(e, BrowserClosedError):
return True
# 检查异常链中是否包含浏览器关闭错误
if e.__cause__ and _is_browser_closed_error(e.__cause__):
return True
# 检查错误消息
# 不依赖 Playwright 的内部异常类型;用消息匹配即可覆盖“目标已关闭”等场景。
error_msg = (
(str(e) or "").lower()
+ " "
+ ("".join(map(str, getattr(e, "args", ()))) or "").lower()
)
if any(
keyword in error_msg
for keyword in [
"target page, context or browser has been closed",
"browser has been closed",
]
):
return True
return False
Comment on lines +11 to +33
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Include __context__ when walking the exception chain.

Exceptions raised without from store the original in __context__, so browser-closed errors can be missed. Consider checking it alongside __cause__.

🔧 Suggested update
     # 检查异常链中是否包含浏览器关闭错误
     if e.__cause__ and _is_browser_closed_error(e.__cause__):
         return True
+    if e.__context__ and _is_browser_closed_error(e.__context__):
+        return True
🧰 Tools
🪛 Ruff (0.14.14)

19-19: Comment contains ambiguous (FULLWIDTH SEMICOLON). Did you mean ; (SEMICOLON)?

(RUF003)

🤖 Prompt for AI Agents
In `@automation/exception_context.py` around lines 11 - 33, The helper
_is_browser_closed_error currently walks the exception chain via __cause__ but
misses exceptions linked through __context__; update _is_browser_closed_error to
also inspect e.__context__ (in addition to e.__cause__) by recursively calling
_is_browser_closed_error on e.__context__ when present (and keep the existing
__cause__ check), so browser-closed errors propagated via implicit context are
detected; ensure you reuse the same function name _is_browser_closed_error for
the recursion to avoid duplicating logic.



def exception_context(step_name):
def decorator(func):
# 检测是否为异步函数
if asyncio.iscoroutinefunction(func):

@wraps(func)
async def async_wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
# 检查是否为浏览器关闭错误
if _is_browser_closed_error(e):
raise BrowserClosedError("用户已关闭浏览器") from None
# 统一包装异常
raise RuntimeError(f"{step_name}时发生异常") from e

return async_wrapper
else:

@wraps(func)
def sync_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
# 检查是否为浏览器关闭错误
if _is_browser_closed_error(e):
raise BrowserClosedError("用户已关闭浏览器") from None
# 统一包装异常
raise RuntimeError(f"{step_name}时发生异常") from e

return sync_wrapper

return decorator
Loading