diff --git a/.gitignore b/.gitignore index 98f26b1..0d06c93 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,6 @@ Thumbs.db # LLM .claude/ + +# Logs +log/ diff --git a/automation/auth.py b/automation/auth.py index 3533c1a..a8193ea 100644 --- a/automation/auth.py +++ b/automation/auth.py @@ -9,6 +9,10 @@ from typing import Optional from playwright.async_api import Page, BrowserContext from urllib.parse import urlparse +from .exception_context import exception_context +from logger import get_logger + +logger = get_logger("automation.auth") class AuthManager: @@ -23,6 +27,7 @@ def __init__(self, page: Page, context: BrowserContext): self.page = page self.context = context + @exception_context("加载Cookie") async def load_cookies(self, cookie_file: str = "cookies.json") -> bool: """ 从文件加载Cookie到浏览器 @@ -31,19 +36,16 @@ async def load_cookies(self, cookie_file: str = "cookies.json") -> bool: """ cookie_path = Path(cookie_file) if not cookie_path.exists(): - print(f"⚠ Cookie文件不存在: {cookie_file}") + logger.warning(f"⚠ Cookie文件不存在: {cookie_file}") return False - try: - with open(cookie_file, 'r', encoding='utf-8') as f: - cookies = json.load(f) - await self.context.add_cookies(cookies) - print(f"✓ Cookie已从文件加载: {cookie_file}") - return True - except Exception as e: - print(f"⚠ 加载Cookie失败: {e}") - return False + with open(cookie_file, 'r', encoding='utf-8') as f: + cookies = json.load(f) + await self.context.add_cookies(cookies) + logger.info(f"✓ Cookie已从文件加载: {cookie_file}") + return True + @exception_context("保存Cookie") async def save_cookies(self, cookie_file: str = "cookies.json"): """ 保存当前浏览器的Cookie到文件 @@ -52,8 +54,9 @@ async def save_cookies(self, cookie_file: str = "cookies.json"): cookies = await self.context.cookies() with open(cookie_file, 'w', encoding='utf-8') as f: json.dump(cookies, f, indent=2, ensure_ascii=False) - print(f"✓ Cookie已保存到: {cookie_file}") + logger.info(f"✓ Cookie已保存到: {cookie_file}") + @exception_context("刷新Cookie") async def refresh_cookies(self, cookie_file: str = "cookies.json"): """ 刷新并保存当前浏览器的Cookie到文件 @@ -63,28 +66,26 @@ async def refresh_cookies(self, cookie_file: str = "cookies.json"): # 检查按钮是否存在 if await refresh_button.count() > 0: - print("✓ 检测到延长会话按钮,正在点击以刷新Cookie...") + logger.info("✓ 检测到延长会话按钮,正在点击以刷新Cookie...") await refresh_button.click() await asyncio.sleep(1) # 等待cookie更新 await self.save_cookies(cookie_file) await self.load_cookies(cookie_file) + @exception_context("检查Cookie有效性") async def check_cookie_validity(self) -> bool: """ 检查Cookie是否有效 通过检查页面内容是否包含"访客不能访问此课程"来判断 :return: True表示Cookie有效,False表示Cookie已失效 """ - try: - page_content = await self.page.content() - if "访客不能访问此课程" in page_content: - print("❌ 检测到Cookie已失效") - return False - return True - except Exception as e: - print(f"⚠ Cookie有效性检测出错: {e}") - return True # 检测失败时默认认为有效,避免误判 + page_content = await self.page.content() + if "访客不能访问此课程" in page_content: + logger.error("❌ 检测到Cookie已失效") + return False + return True + @exception_context("使用Cookie登录") async def login_with_cookies(self, base_url: str, cookie_file: str = "cookies.json") -> bool: """ 使用Cookie登录 @@ -92,15 +93,16 @@ async def login_with_cookies(self, base_url: str, cookie_file: str = "cookies.js :param cookie_file: Cookie文件路径 :return: 是否登录成功 """ - print("正在使用Cookie登录...") + logger.info("正在使用Cookie登录...") # 加载Cookie if not await self.load_cookies(cookie_file): - print("\n❌ Cookie加载失败!") + logger.error("\n❌ Cookie加载失败!") return False # 检查登录状态 return await self.check_login_status(base_url) + @exception_context("检查登录状态") async def check_login_status(self, base_url: str) -> bool: """ 检查登录状态是否有效 @@ -117,19 +119,20 @@ async def check_login_status(self, base_url: str) -> bool: # 判断是否重定向到了不同的页面 current_parsed = urlparse(current_url) base_parsed = urlparse(base_url) - + # Compare scheme, netloc, and path (ignoring query params and fragments) - if (current_parsed.scheme != base_parsed.scheme or + if (current_parsed.scheme != base_parsed.scheme or current_parsed.netloc != base_parsed.netloc or current_parsed.path.rstrip('/') != base_parsed.path.rstrip('/')): - print(f"❌ Cookie登录失败! 页面被重定向到: {current_url}") - print("💡 Cookie可能已过期,请重新获取Cookie") + logger.error(f"❌ Cookie登录失败! 页面被重定向到: {current_url}") + logger.info("💡 Cookie可能已过期,请重新获取Cookie") return False - print(f"✓ Cookie登录成功,当前页面: {self.page.url}") + logger.info(f"✓ Cookie登录成功,当前页面: {self.page.url}") return True - async def interactive_login_and_save_cookies(self, + @exception_context("交互式登录并保存Cookie") + async def interactive_login_and_save_cookies(self, login_url: str, base_url: str, sso_index_url: str, @@ -141,47 +144,44 @@ async def interactive_login_and_save_cookies(self, :param cookie_file: Cookie文件路径 :return: 是否成功登录并保存Cookie """ - print("🌐 正在打开登录页面...") + logger.info("🌐 正在打开登录页面...") await self.page.goto(login_url, wait_until='networkidle') await self.page.set_viewport_size({"width": 800, "height": 600}) - print(f"✅ 登录页面已打开: {login_url}") - print("📝 请在浏览器中完成登录操作") + logger.info(f"✅ 登录页面已打开: {login_url}") + logger.info("📝 请在浏览器中完成登录操作") await asyncio.get_running_loop().run_in_executor(None, input, "🔑 登录完成后,请按回车键继续...") # 先前往 SSO 主页 await self.page.goto(sso_index_url) - print("🔍 尝试获取cookie...") - try: - # 查找文本为"砺儒云课堂"的a标签 - li_ru_link = self.page.get_by_text("砺儒云课堂") - if await li_ru_link.count() > 0: - # 使用 context.expect_popup() 来捕捉点击后产生的新页面 - async with self.page.expect_popup() as popup_info: - await li_ru_link.first.click() - - # 这里的 moodle_page 就是新打开的那个标签页 - moodle_page = await popup_info.value - - # 等待新页面加载完成 - await moodle_page.wait_for_load_state() - print("✅ 成功跳转到目标页面") - else: - print("⚠️ 未找到'砺儒云课堂'链接") - except Exception as e: - print(f"⚠️ 点击'砺儒云课堂'链接时出错: {e}") + logger.info("🔍 尝试获取cookie...") + # 查找文本为"砺儒云课堂"的a标签 + li_ru_link = self.page.get_by_text("砺儒云课堂") + if await li_ru_link.count() > 0: + # 使用 context.expect_popup() 来捕捉点击后产生的新页面 + async with self.page.expect_popup() as popup_info: + await li_ru_link.first.click() + + # 这里的 moodle_page 就是新打开的那个标签页 + moodle_page = await popup_info.value + + # 等待新页面加载完成 + await moodle_page.wait_for_load_state() + logger.info("✅ 成功跳转到目标页面") + else: + logger.warning("⚠️ 未找到'砺儒云课堂'链接") # 验证Cookie是否有效 - print("🔍 验证登录状态...") + logger.info("🔍 验证登录状态...") if await self.check_login_status(base_url): - print("✅ 登录验证成功!") + logger.info("✅ 登录验证成功!") else: while not await self.check_login_status(base_url): - print("❌ 登录验证失败!") + logger.error("❌ 登录验证失败!") loop = asyncio.get_running_loop() retry = await loop.run_in_executor(None, input, "是否重试?(y/n): ") if retry.strip().lower() not in ('y', 'yes'): return False - print("✅ 登录验证成功!") + logger.info("✅ 登录验证成功!") # 保存当前浏览器的Cookie await self.save_cookies(cookie_file) - print(f"✅ Cookie已保存到: {cookie_file}") + logger.info(f"✅ Cookie已保存到: {cookie_file}") return True diff --git a/automation/browser.py b/automation/browser.py index b58611e..b0b82b7 100644 --- a/automation/browser.py +++ b/automation/browser.py @@ -4,6 +4,9 @@ """ from playwright.async_api import async_playwright, Browser, BrowserContext, Page +from logger import get_logger + +logger = get_logger("automation.browser") class BrowserManager: @@ -38,13 +41,13 @@ async def setup(self): user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' ) self.page = await self.context.new_page() - print("✓ 浏览器启动成功 (已静音)") + logger.info("✓ 浏览器启动成功 (已静音)") async def close(self): """关闭浏览器""" if self.browser: await self.browser.close() - print("\n✓ 浏览器已关闭") + logger.info("\n✓ 浏览器已关闭") def get_page(self) -> Page: """获取当前页面对象""" diff --git a/automation/exception_context.py b/automation/exception_context.py new file mode 100644 index 0000000..7afbf97 --- /dev/null +++ b/automation/exception_context.py @@ -0,0 +1,68 @@ +from functools import wraps +import asyncio + + +class BrowserClosedError(Exception): + """用户手动关闭浏览器时抛出的异常""" + + pass + + +def _is_browser_closed_error(e: BaseException) -> bool: + """检查是否为浏览器关闭相关的错误""" + if isinstance(e, BrowserClosedError): + return True + # 检查异常链中是否包含浏览器关闭错误 + if e.__cause__ and _is_browser_closed_error(e.__cause__): + return True + # 检查错误消息 + # 不依赖 Playwright 的内部异常类型;用消息匹配即可覆盖“目标已关闭”等场景。 + error_msg = ( + (str(e) or "").lower() + + " " + + ("".join(map(str, getattr(e, "args", ()))) or "").lower() + ) + if any( + keyword in error_msg + for keyword in [ + "target page, context or browser has been closed", + "browser has been closed", + ] + ): + return True + return False + + +def exception_context(step_name): + def decorator(func): + # 检测是否为异步函数 + if asyncio.iscoroutinefunction(func): + + @wraps(func) + async def async_wrapper(*args, **kwargs): + try: + return await func(*args, **kwargs) + except Exception as e: + # 检查是否为浏览器关闭错误 + if _is_browser_closed_error(e): + raise BrowserClosedError("用户已关闭浏览器") from None + # 统一包装异常 + raise RuntimeError(f"{step_name}时发生异常") from e + + return async_wrapper + else: + + @wraps(func) + def sync_wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except Exception as e: + # 检查是否为浏览器关闭错误 + if _is_browser_closed_error(e): + raise BrowserClosedError("用户已关闭浏览器") from None + # 统一包装异常 + raise RuntimeError(f"{step_name}时发生异常") from e + + return sync_wrapper + + return decorator diff --git a/automation/video.py b/automation/video.py index 1f60597..fc50e3a 100644 --- a/automation/video.py +++ b/automation/video.py @@ -5,10 +5,20 @@ import asyncio from typing import List, Optional -from playwright.async_api import Page -from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn, TimeElapsedColumn +from playwright.async_api import Page, TimeoutError as PlaywrightTimeoutError +from rich.progress import ( + Progress, + SpinnerColumn, + TextColumn, + BarColumn, + TaskProgressColumn, + TimeElapsedColumn, +) from rich.console import Console +from .exception_context import exception_context, BrowserClosedError +from logger import get_logger +logger = get_logger("automation.video") console = Console() @@ -40,98 +50,92 @@ def __init__(self, page: Page, auth_manager): self.page = page self.auth_manager = auth_manager - async def ensure_video_playing(self, video_selector: str = "video") -> dict: + @exception_context("确保视频播放") + async def ensure_video_playing( + self, video_selector: str = "video" + ) -> Optional[dict]: """ 确保视频正在播放,如果暂停则自动恢复,并返回视频状态 :param video_selector: 视频元素的CSS选择器 :return: 包含视频状态的字典 {paused, currentTime, duration, ended},获取失败返回 None """ - try: - video = self.page.locator(video_selector) - if await video.count() == 0: - return None - - # 获取视频状态 - video_state = await video.evaluate(""" - el => ({ - paused: el.paused, - currentTime: el.currentTime, - duration: el.duration, - ended: el.ended - }) - """) - - # 如果视频暂停了(且未播放完毕),自动恢复播放 - if video_state.get('paused') and not video_state.get('ended'): - console.print("\n[yellow]⚠️ 检测到视频已暂停,正在自动恢复播放...[/yellow]") - await video.evaluate("el => el.play()") - console.print("[green]✓ 视频已恢复播放[/green]") - - return video_state - - except Exception as e: - console.print(f"\n[red]❌ 检测视频播放状态时出现异常: {e}[/red]") + video = self.page.locator(video_selector) + if await video.count() == 0: return None - async def check_browser_closed(self): + # 获取视频状态 + video_state = await video.evaluate(""" + el => ({ + paused: el.paused, + currentTime: el.currentTime, + duration: el.duration, + ended: el.ended + }) + """) + + # 如果视频暂停了(且未播放完毕),自动恢复播放 + if video_state.get("paused") and not video_state.get("ended"): + logger.warning("⚠️ 检测到视频已暂停,正在自动恢复播放...") + await video.evaluate("el => el.play()") + logger.info("✓ 视频已恢复播放") + + return video_state + + @exception_context("检查页面状态") + async def check_page_closed(self): """ - 检查浏览器是否已被用户手动关闭 - 如果浏览器已关闭,打印提示信息并抛出异常 - 如果浏览器正常运行,静默返回 + 检查页面是否已被用户手动关闭 + 如果页面已关闭,打印提示信息并抛出异常 + 如果页面正常,静默返回 """ - try: - # 检查页面是否已关闭 - if self.page.is_closed(): - print("\n⚠️ 检测到浏览器已被手动关闭") - print("💡 程序即将退出") - raise Exception("浏览器已被用户手动关闭") - # 尝试获取页面标题来验证页面是否仍然可访问 - await self.page.title() - except Exception as e: - # 如果是我们自己抛出的异常,直接传递 - if "浏览器已被用户手动关闭" in str(e): - raise - # 其他异常也视为浏览器已关闭 - print("\n⚠️ 检测到浏览器已被手动关闭") - print("💡 程序即将退出") - raise Exception("浏览器已被用户手动关闭") - - async def get_video_links_by_pattern(self, page_url: str, url_pattern: str) -> List[str]: + # 检查页面是否已关闭 + if self.page.is_closed(): + logger.warning("\n⚠️ 检测到页面已被手动关闭") + logger.info("💡 程序即将退出") + raise BrowserClosedError("页面已被用户手动关闭") + + @exception_context("获取视频链接") + async def get_video_links_by_pattern( + self, page_url: str, url_pattern: str + ) -> List[str]: """ 通过URL模式匹配获取视频链接 :param page_url: 包含视频链接的页面URL :param url_pattern: 视频链接的URL模式(如 "https://example.com/mod/fsresource/view.php?id=") :return: 视频链接列表 """ - print(f"\n正在访问视频列表页面: {page_url}") - await self.page.goto(page_url, wait_until='networkidle') + logger.info(f"\n正在访问视频列表页面: {page_url}") + await self.page.goto(page_url, wait_until="networkidle") # 等待页面加载完成 await asyncio.sleep(2) # 获取所有链接 links = await self.page.locator(f'a[href*="{url_pattern}"]').evaluate_all( - 'elements => elements.map(e => e.href)' + "elements => elements.map(e => e.href)" ) # 去重并排序 links = sorted(list(set(links))) - print(f"✓ 找到 {len(links)} 个匹配的视频链接") + logger.info(f"✓ 找到 {len(links)} 个匹配的视频链接") # 打印前5个链接作为示例 if links: - print("\n示例链接:") + logger.info("\n示例链接:") for i, link in enumerate(links[:5], 1): - print(f" {i}. {link}") + logger.info(f" {i}. {link}") if len(links) > 5: - print(f" ... 还有 {len(links) - 5} 个链接") + logger.info(f" ... 还有 {len(links) - 5} 个链接") else: - print(f"\n⚠ 未找到匹配模式 '{url_pattern}' 的链接") - print("💡 提示: 检查 URL_PATTERN 配置是否正确") + logger.warning(f"\n⚠ 未找到匹配模式 '{url_pattern}' 的链接") + logger.info("💡 提示: 检查 URL_PATTERN 配置是否正确") return links - async def get_video_duration(self, video_selector: str = "video") -> Optional[float]: + @exception_context("获取视频时长") + async def get_video_duration( + self, video_selector: str = "video" + ) -> Optional[float]: """ 获取视频时长(秒) :param video_selector: 视频元素的CSS选择器 @@ -145,19 +149,27 @@ async def get_video_duration(self, video_selector: str = "video") -> Optional[fl duration = await video.evaluate("el => el.duration || null") if duration: - print(f"✓ 视频时长: {self.format_time(duration)}") + logger.info(f"✓ 视频时长: {self.format_time(duration)}") return duration else: - print("⚠ 无法获取视频时长,可能并非视频页,将在默认等待时间后跳转下一链接") + logger.warning( + "⚠ 无法获取视频时长,可能并非视频页,将在默认等待时间后跳转下一链接" + ) return None - except Exception as e: - print(f"⚠ 获取视频时长失败: {e}") + except PlaywrightTimeoutError: + # 视频元素不存在是预期行为(可能不是视频页) + logger.warning("⚠ 未找到视频元素,可能并非视频页") return None - async def play_video(self, video_url: str, video_selector: str = "video", - play_button_selector: Optional[str] = None, - default_wait_time: int = 60): + @exception_context("播放视频并等待完成") + async def play_video( + self, + video_url: str, + video_selector: str = "video", + play_button_selector: Optional[str] = None, + default_wait_time: int = 60, + ): """ 播放视频并等待播放完成 :param video_url: 视频页面URL @@ -165,22 +177,22 @@ async def play_video(self, video_url: str, video_selector: str = "video", :param play_button_selector: 播放按钮的CSS选择器(如果需要手动点击播放) :param default_wait_time: 如果无法获取视频时长,使用的默认等待时间(秒) """ - print(f"\n{'='*60}") - print(f"正在访问视频页面: {video_url}") - await self.page.goto(video_url, wait_until='networkidle') + logger.info(f"\n{'=' * 60}") + logger.info(f"正在访问视频页面: {video_url}") + await self.page.goto(video_url, wait_until="networkidle") # 等待页面加载 await asyncio.sleep(2) # 检查浏览器是否已关闭 - await self.check_browser_closed() + await self.check_page_closed() # 尝试自动延长会话 await self.auth_manager.refresh_cookies() # 检查Cookie是否有效 if not await self.auth_manager.check_cookie_validity(): - print("⚠ Cookie已失效,停止观看视频") + logger.warning("⚠ Cookie已失效,停止观看视频") raise Exception("Cookie已失效,请重新获取Cookie") # 检查视频是否已完成 @@ -189,72 +201,73 @@ async def play_video(self, video_url: str, video_selector: str = "video", # 获取文字内容 text = await tips_locator.text_content() if text and "已完成" in text.strip(): - print("✓ 该视频已标记为完成,跳过观看") + logger.info("✓ 该视频已标记为完成,跳过观看") return # 如果需要点击播放按钮 if play_button_selector: try: - await self.page.wait_for_selector(play_button_selector, timeout=5000) - await self.page.click(play_button_selector) - print("✓ 已点击播放按钮") - except: - print("⚠ 未找到播放按钮,可能并非视频页,即将自动跳转下一链接") + await self.page.click(play_button_selector, timeout=5000) + logger.info("✓ 已点击播放按钮") + except PlaywrightTimeoutError: + logger.warning("⚠ 未找到播放按钮,可能并非视频页,即将自动跳转下一链接") return # 智能计算视频剩余时间 duration = None - try: - # 获取视频总时长 - video_duration = await self.get_video_duration(video_selector) + # 获取视频总时长 + video_duration = await self.get_video_duration(video_selector) - if video_duration is None: - print("⚠ 无法获取视频总时长") - else: - # 尝试获取已观看时长 - watched_locator = self.page.locator(".num-gksc > span") - - if await watched_locator.count() > 0: - watched_text = await watched_locator.text_content() - - if watched_text: - # 尝试解析已观看时长(去除空格和可能的单位) - watched_text = watched_text.strip() - try: - watched_duration = float(watched_text) - - # 计算剩余时间 - remaining = video_duration - watched_duration - - if remaining < 0: - print(f"⚠ 已观看时长({self.format_time(watched_duration)}) 大于总时长({self.format_time(video_duration)}),视频可能已完成") - duration = 0 # 视频已完成,无需等待 - elif remaining == 0: - print("✓ 视频已观看完毕") - duration = 0 - else: - duration = remaining - print(f"✓ 总时长: {self.format_time(video_duration)}, 已观看: {self.format_time(watched_duration)}, 剩余: {self.format_time(duration)}") - except ValueError: - print(f"⚠ 无法解析已观看时长: '{watched_text}', 使用视频总时长") - duration = video_duration - else: - print("⚠ 已观看时长元素为空,使用视频总时长") + if video_duration is None: + logger.warning("⚠ 无法获取视频总时长") + else: + # 尝试获取已观看时长 + watched_locator = self.page.locator(".num-gksc > span") + + if await watched_locator.count() > 0: + watched_text = await watched_locator.text_content() + + if watched_text: + # 尝试解析已观看时长(去除空格和可能的单位) + watched_text = watched_text.strip() + try: + watched_duration = float(watched_text) + + # 计算剩余时间 + remaining = video_duration - watched_duration + + if remaining < 0: + logger.warning( + f"⚠ 已观看时长({self.format_time(watched_duration)}) 大于总时长({self.format_time(video_duration)}),视频可能已完成" + ) + duration = 0 # 视频已完成,无需等待 + elif remaining == 0: + logger.info("✓ 视频已观看完毕") + duration = 0 + else: + duration = remaining + logger.info( + f"✓ 总时长: {self.format_time(video_duration)}, 已观看: {self.format_time(watched_duration)}, 剩余: {self.format_time(duration)}" + ) + except ValueError: + # 数据解析失败是预期行为,使用降级方案 + logger.warning( + f"⚠ 无法解析已观看时长: '{watched_text}', 使用视频总时长" + ) duration = video_duration else: - print("⚠ 未找到已观看时长元素,使用视频总时长") + logger.warning("⚠ 已观看时长元素为空,使用视频总时长") duration = video_duration - - except Exception as e: - print(f"⚠ 计算剩余时间时出错: {e}") - duration = None + else: + logger.warning("⚠ 未找到已观看时长元素,使用视频总时长") + duration = video_duration # 根据计算结果等待 if duration is not None and duration > 0: # 等待视频播放完成 max_wait_time = duration + 60 # 最大等待时间,防止无限循环 - console.print(f"[cyan]⏳ 等待视频播放完成(预计 {self.format_time(duration)})...[/cyan]") + logger.info(f"⏳ 等待视频播放完成(预计 {self.format_time(duration)})...") # 使用 rich 进度条显示播放进度 with Progress( @@ -275,19 +288,25 @@ async def play_video(self, video_url: str, video_selector: str = "video", elapsed += 5 # 检查浏览器是否已关闭 - await self.check_browser_closed() + await self.check_page_closed() # 检查视频状态并恢复播放 video_state = await self.ensure_video_playing(video_selector) if video_state: - current_time = video_state.get('currentTime', 0) - video_duration = video_state.get('duration', 0) - ended = video_state.get('ended', False) + current_time = video_state.get("currentTime", 0) + video_duration = video_state.get("duration", 0) + ended = video_state.get("ended", False) # 视频已播放完毕 - if ended or (video_duration > 0 and current_time >= video_duration - 1): - progress.update(task, completed=100, description="[green]播放完毕[/green]") + if ended or ( + video_duration > 0 and current_time >= video_duration - 1 + ): + progress.update( + task, + completed=100, + description="[green]播放完毕[/green]", + ) break # 更新进度条 @@ -296,36 +315,43 @@ async def play_video(self, video_url: str, video_selector: str = "video", progress.update( task, completed=percent, - description=f"[cyan]{self.format_time(current_time)}[/cyan]/[dim]{self.format_time(video_duration)}[/dim]" + description=f"[cyan]{self.format_time(current_time)}[/cyan]/[dim]{self.format_time(video_duration)}[/dim]", ) else: # 无法获取视频状态时 - progress.update(task, description=f"[yellow]等待中 {self.format_time(elapsed)}[/yellow]") + progress.update( + task, + description=f"[yellow]等待中 {self.format_time(elapsed)}[/yellow]", + ) # 尝试自动延长会话 await self.auth_manager.refresh_cookies() # 检查Cookie是否有效 if not await self.auth_manager.check_cookie_validity(): - console.print("[red]⚠ Cookie已失效,停止观看视频[/red]") + logger.error("⚠ Cookie已失效,停止观看视频") raise Exception("Cookie已失效,请重新获取Cookie") - console.print(f"[green]✓ 视频播放完毕[/green]") + logger.info("✓ 视频播放完毕") elif duration == 0: # 视频已完成,无需等待 - print("✓ 视频无需等待") + logger.info("✓ 视频无需等待") else: # 使用默认等待时间 - print("⚠ 无法获取视频时长,使用默认等待时间...") - print(f"⏳ 等待 {self.format_time(default_wait_time)}...") + logger.warning("⚠ 无法获取视频时长,使用默认等待时间...") + logger.info(f"⏳ 等待 {self.format_time(default_wait_time)}...") await asyncio.sleep(default_wait_time) - print("✓ 视频播放完成") + logger.info("✓ 视频播放完成") - async def watch_videos(self, video_links: List[str], - video_selector: str = "video", - play_button_selector: Optional[str] = None, - default_wait_time: int = 60): + @exception_context("批量观看视频") + async def watch_videos( + self, + video_links: List[str], + video_selector: str = "video", + play_button_selector: Optional[str] = None, + default_wait_time: int = 60, + ): """ 批量观看视频 :param video_links: 视频链接列表 @@ -333,23 +359,20 @@ async def watch_videos(self, video_links: List[str], :param play_button_selector: 播放按钮的CSS选择器 :param default_wait_time: 默认等待时间(秒) """ - print(f"\n开始观看 {len(video_links)} 个视频") + logger.info(f"\n开始观看 {len(video_links)} 个视频") for i, link in enumerate(video_links, 1): # 检查浏览器是否已关闭 - await self.check_browser_closed() + await self.check_page_closed() - print(f"\n[{i}/{len(video_links)}] 当前视频:") + logger.info(f"\n[{i}/{len(video_links)}] 当前视频:") await self.play_video( - link, - video_selector, - play_button_selector, - default_wait_time + link, video_selector, play_button_selector, default_wait_time ) # 视频之间暂停2秒 if i < len(video_links): await asyncio.sleep(2) - print(f"\n{'='*60}") - print(f"✓ 所有视频观看完成! 共完成 {len(video_links)} 个视频") + logger.info(f"\n{'=' * 60}") + logger.info(f"✓ 所有视频观看完成! 共完成 {len(video_links)} 个视频") diff --git a/cookie_fix.py b/cookie_fix.py index d06887d..a6538d1 100644 --- a/cookie_fix.py +++ b/cookie_fix.py @@ -1,53 +1,59 @@ # convert_cookies.py import json +from logger import get_logger +from automation.exception_context import exception_context +logger = get_logger("cookie_fix") + + +@exception_context("Cookie转换") def cookie_fix(): + # 从CLI读取浏览器导出的Cookie - 保留 print 用于用户交互 + print("请粘贴浏览器导出的Cookie JSON (连续敲击两次回车(Enter)结束输入):") + lines = list(iter(input, "")) + content = "\n".join(lines) + + if content == "": + logger.error("✗ 输入为空,请检查输入内容") + return False try: - # 从CLI读取浏览器导出的Cookie - print("请粘贴浏览器导出的Cookie JSON (连续敲击两次回车(Enter)结束输入):") - lines = list(iter(input, '')) - content = '\n'.join(lines) - - if content == '': - print("✗ 输入为空,请检查输入内容") - return False browser_cookies = json.loads(content) + except (json.JSONDecodeError, ValueError) as e: + logger.error(f"✗ Cookie JSON 解析失败: {e}") + return False + + # 转换为Playwright格式 + playwright_cookies = [] + for cookie in browser_cookies: + # 处理 sameSite: null/空字符串 -> 'Lax', 其他值标准化首字母大写 + same_site = cookie.get("sameSite") + if same_site is None or same_site == "": + same_site = "Lax" + elif isinstance(same_site, str): + # 标准化为首字母大写格式 + same_site_lower = same_site.lower() + if same_site_lower in ["lax", "strict", "none"]: + same_site = same_site_lower.capitalize() + elif same_site_lower in ["unspecified", "no_restriction"]: + same_site = "Lax" + else: + same_site = "Lax" + + playwright_cookie = { + "name": cookie.get("name"), + "value": cookie.get("value"), + "domain": cookie.get("domain"), + "path": cookie.get("path") or "/", + "expires": cookie.get("expirationDate", -1), + "httpOnly": cookie.get("httpOnly", False), + "secure": cookie.get("secure", False), + "sameSite": same_site, + } + playwright_cookies.append(playwright_cookie) + + # 保存 + with open("cookies.json", "w", encoding="utf-8") as f: + json.dump(playwright_cookies, f, indent=2, ensure_ascii=False) - # 转换为Playwright格式 - playwright_cookies = [] - for cookie in browser_cookies: - # 处理 sameSite: null/空字符串 -> 'Lax',其他值标准化首字母大写 - same_site = cookie.get('sameSite') - if same_site is None or same_site == '': - same_site = 'Lax' - elif isinstance(same_site, str): - # 标准化为首字母大写格式 - same_site_lower = same_site.lower() - if same_site_lower in ['lax', 'strict', 'none']: - same_site = same_site_lower.capitalize() - elif same_site_lower in ['unspecified', 'no_restriction']: - same_site = 'Lax' - else: - same_site = 'Lax' - - playwright_cookie = { - 'name': cookie.get('name'), - 'value': cookie.get('value'), - 'domain': cookie.get('domain'), - 'path': cookie.get('path') or '/', - 'expires': cookie.get('expirationDate', -1), - 'httpOnly': cookie.get('httpOnly', False), - 'secure': cookie.get('secure', False), - 'sameSite': same_site - } - playwright_cookies.append(playwright_cookie) - - # 保存 - with open('cookies.json', 'w', encoding='utf-8') as f: - json.dump(playwright_cookies, f, indent=2, ensure_ascii=False) - - print("✓ Cookie转换完成!") - return True - except Exception as e: - print(f"✗ Cookie转换失败: {e}") - return False \ No newline at end of file + logger.info("✓ Cookie转换完成!") + return True diff --git a/logger.py b/logger.py new file mode 100644 index 0000000..04faa82 --- /dev/null +++ b/logger.py @@ -0,0 +1,85 @@ +import logging +from logging.handlers import RotatingFileHandler +from pathlib import Path +from rich.logging import RichHandler + +# 日志目录 +LOG_DIR = Path(__file__).parent / "log" + +_initialized = False + +# 需要压制的第三方库(它们的 DEBUG 日志太吵了) +NOISY_LOGGERS = [ + "playwright", + "httpx", + "httpcore", + "urllib3", + "asyncio", +] + + +def setup_logging( + log_file: str = "debug.log", + max_bytes: int = 5 * 1024 * 1024, + backup_count: int = 3, +) -> logging.Logger: + """ + 初始化日志系统 + - 文件处理器:使用 RotatingFileHandler 自动轮转,记录所有级别日志 + - 终端处理器:使用 RichHandler 美化 INFO 及以上级别的输出 + + :param log_file: 日志文件名(会自动放到 log 目录下) + :param max_bytes: 单个日志文件最大字节数,默认 5MB + :param backup_count: 保留的备份文件数量,默认 3 个 + """ + global _initialized + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) + + # 避免重复添加处理器 + if _initialized: + return logger + + # 确保日志目录存在 + LOG_DIR.mkdir(exist_ok=True) + log_path = LOG_DIR / log_file + + # --- 1. 文件处理器:自动轮转,记录一切 --- + file_handler = RotatingFileHandler( + log_path, + maxBytes=max_bytes, + backupCount=backup_count, + encoding="utf-8", + ) + file_handler.setLevel(logging.DEBUG) + file_fmt = logging.Formatter('%(asctime)s [%(levelname)s] %(name)s: %(message)s') + file_handler.setFormatter(file_fmt) + + # --- 2. 终端处理器:追求极致美观 --- + rich_handler = RichHandler( + show_time=False, + show_path=False, + markup=True, + rich_tracebacks=True, + ) + rich_handler.setLevel(logging.INFO) + + logger.addHandler(file_handler) + logger.addHandler(rich_handler) + + # --- 3. 压制第三方库的噪音日志 --- + for name in NOISY_LOGGERS: + logging.getLogger(name).setLevel(logging.WARNING) + + _initialized = True + return logger + + +def get_logger(name: str | None = None) -> logging.Logger: + """ + 获取指定名称的 logger,自动确保日志系统已初始化 + :param name: logger 名称,通常使用 __name__ + :return: logger 实例 + """ + setup_logging() + return logging.getLogger(name) diff --git a/main.py b/main.py index f5b3ec4..d1dc2b5 100644 --- a/main.py +++ b/main.py @@ -4,12 +4,51 @@ """ import asyncio -import traceback +import sys +import warnings from pathlib import Path from cookie_fix import cookie_fix from automation import BrowserManager, AuthManager, VideoManager +from automation.exception_context import BrowserClosedError +from logger import setup_logging, get_logger import config +# 抑制 asyncio 在 Windows 上关闭时的资源警告 +warnings.filterwarnings("ignore", category=ResourceWarning, message=".*unclosed.*") + + +def _custom_unraisablehook(unraisable): + """自定义 unraisable 异常处理,抑制浏览器关闭时的 asyncio 清理错误""" + # 忽略 asyncio transport 相关的清理错误 + if unraisable.exc_type in (ValueError, OSError): + err_msg = str(unraisable.exc_value).lower() + if any( + keyword in err_msg + for keyword in [ + "i/o operation on closed pipe", + "closed pipe", + "unclosed transport", + ] + ): + return # 静默忽略 + # 其他异常使用默认处理 + sys.__unraisablehook__(unraisable) + + +def _custom_excepthook(exc_type, exc_value, exc_traceback): + """全局异常处理器,将未捕获的异常记录到日志文件""" + # KeyboardInterrupt 不记录日志,只做友好提示 + if issubclass(exc_type, KeyboardInterrupt): + return + # 记录完整的异常信息到日志 + logger.critical("未捕获的异常", exc_info=(exc_type, exc_value, exc_traceback)) + + +sys.unraisablehook = _custom_unraisablehook +sys.excepthook = _custom_excepthook + +logger = get_logger(__name__) + def print_welcome(): """打印欢迎界面""" @@ -34,19 +73,20 @@ def print_welcome(): async def main(): """主函数""" - + # 初始化日志系统 + setup_logging() + # 显示欢迎界面 print_welcome() - + # 从 config.py 读取配置 - print("📦 正在初始化浏览器...") + logger.info("📦 正在初始化浏览器...") browser_manager = None try: # 1. 启动浏览器 browser_manager = BrowserManager( - browser_type=config.BROWSER, - headless=config.HEADLESS + browser_type=config.BROWSER, headless=config.HEADLESS ) await browser_manager.setup() # 2. 初始化认证和视频管理器 @@ -61,62 +101,71 @@ async def main(): cookie_path = Path(config.COOKIE_FILE) # 如果 cookies.json 文件已存在,尝试直接使用已有 Cookies 登录 if cookie_path.exists(): - print(f"📂 检测到已有 Cookie 文件: {config.COOKIE_FILE},尝试直接使用该文件登录...") + logger.info( + f"📂 检测到已有 Cookie 文件: {config.COOKIE_FILE},尝试直接使用该文件登录..." + ) login_success = await auth_manager.login_with_cookies( - config.BASE_URL, - config.COOKIE_FILE + config.BASE_URL, config.COOKIE_FILE ) if not login_success: - print("登录凭证已失效或不存在") - # 选择登录方式 + logger.warning("登录凭证已失效或不存在") + # 选择登录方式 - 保留 print 用于用户交互 print("\n🔐 请选择获取登录凭证(Cookies)的方式:") - print(" 1. 交互式登录(推荐)- 自动打开登录页面,您手动登录后程序自动获取Cookies") - print(" 2. 使用您手动获取的 Cookies 登录 - 在命令行中直接粘贴浏览器导出的 Cookies JSON") - + print( + " 1. 交互式登录(推荐)- 自动打开登录页面,您手动登录后程序自动获取Cookies" + ) + print( + " 2. 使用您手动获取的 Cookies 登录 - 在命令行中直接粘贴浏览器导出的 Cookies JSON" + ) + login_success = False while True: try: loop = asyncio.get_running_loop() - choice = await loop.run_in_executor(None, input, "请输入选择 (1/2,默认为1): ") + choice = await loop.run_in_executor( + None, input, "请输入选择 (1/2,默认为1): " + ) choice = choice.strip() if choice in ("", "1"): # 默认使用交互式登录 - login_success = await auth_manager.interactive_login_and_save_cookies( - config.LOGIN_URL, - config.BASE_URL, - config.SSO_INDEX_URL, - config.COOKIE_FILE + login_success = ( + await auth_manager.interactive_login_and_save_cookies( + config.LOGIN_URL, + config.BASE_URL, + config.SSO_INDEX_URL, + config.COOKIE_FILE, + ) ) break elif choice == "2": # 使用手动导出的 cookies 登录 if cookie_fix(): - print("✓ Cookies 格式化成功") + logger.info("✓ Cookies 格式化成功") login_success = await auth_manager.login_with_cookies( - config.BASE_URL, - config.COOKIE_FILE + config.BASE_URL, config.COOKIE_FILE ) else: - print("⚠ Cookies 格式化失败,请检查输入的 Cookies 内容是否正确,程序即将结束") + logger.error( + "⚠ Cookies 格式化失败,请检查输入的 Cookies 内容是否正确,程序即将结束" + ) break else: print("⚠️ 输入无效,请输入 1 或 2") except KeyboardInterrupt: - print("\n\n程序已由用户中断。") + logger.info("\n\n程序已由用户中断。") return if not login_success: - print("\n❌ 登录失败!") + logger.error("\n❌ 登录失败!") return # 4. 通过URL模式获取视频链接 - print(f"\n正在提取视频链接...") - print(f"URL模式: {config.URL_PATTERN}") + logger.info(f"\n正在提取视频链接...") + logger.info(f"URL模式: {config.URL_PATTERN}") video_links = await video_manager.get_video_links_by_pattern( - config.VIDEO_LIST_URL, - config.URL_PATTERN + config.VIDEO_LIST_URL, config.URL_PATTERN ) # 5. 观看所有视频 @@ -125,36 +174,33 @@ async def main(): video_links, config.VIDEO_ELEMENT_SELECTOR, config.PLAY_BUTTON_SELECTOR, - config.DEFAULT_WAIT_TIME + config.DEFAULT_WAIT_TIME, ) else: - print("❌ 未找到任何视频链接。") + logger.error("❌ 未找到任何视频链接。") suggestions() - except Exception as e: - print(f"\n❌ 发生错误: {e}") - traceback.print_exc() + except BrowserClosedError: + logger.info("\n👋 检测到浏览器已关闭,程序正常退出") + except Exception: + # 只打印一次:RichHandler 会负责美化堆栈,避免和 traceback.print_exc() 重复输出。 + logger.error("\n❌ 发生错误", exc_info=True) suggestions() - finally: - # 6. 关闭浏览器 - if browser_manager: - try: - # 检查浏览器是否仍在运行 - browser = browser_manager.browser - if browser and browser.is_connected(): - input("\n按回车键退出并关闭浏览器...") - await browser_manager.close() - except Exception: - # 浏览器已被手动关闭或其他错误,静默处理 - pass + def suggestions(): - print("\n💡 故障排查建议:") - print(" 1. 检查 config.py 中是否正确配置了课程链接") - print(" 2. 确认 cookies.json 文件存在") - print(" 3. 确认 Cookie 是否有效") - print(" 4. 确认网络状态良好") - print(" 5. 如仍有问题,请提交 issue 至 GitHub 仓库:github.com/YewFence/fly_vedio_assignment_away\n") + logger.info("\n💡 故障排查建议:") + logger.info(" 1. 检查 config.py 中是否正确配置了课程链接") + logger.info(" 2. 确认 cookies.json 文件存在") + logger.info(" 3. 确认 Cookie 是否有效") + logger.info(" 4. 确认网络状态良好") + logger.info( + " 5. 如仍有问题,请提交 issue 至 GitHub 仓库:github.com/YewFence/fly_vedio_assignment_away\n" + ) + if __name__ == "__main__": - asyncio.run(main()) + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n\n👋 程序已由用户中断,再见!")