From c1b221461a1b5c4d5005b25b7c7c8300a0c8357c Mon Sep 17 00:00:00 2001 From: fengmo Date: Mon, 16 Mar 2026 14:36:25 +0800 Subject: [PATCH 1/4] feat(health): add health checker and config validator Add HealthChecker class with 10 system checks: - config files, LLM providers, skills, dependencies - environment tools, disk space, channel credentials - MCP clients, required files, directory permissions Add ConfigValidator for semantic validation of config.json including channel, MCP client, and agent settings checks. --- src/copaw/config/health.py | 639 ++++++++++++++++++++++++++++++++++ src/copaw/config/validator.py | 252 ++++++++++++++ 2 files changed, 891 insertions(+) create mode 100644 src/copaw/config/health.py create mode 100644 src/copaw/config/validator.py diff --git a/src/copaw/config/health.py b/src/copaw/config/health.py new file mode 100644 index 000000000..ac562de4c --- /dev/null +++ b/src/copaw/config/health.py @@ -0,0 +1,639 @@ +# -*- coding: utf-8 -*- +"""System health checks for CoPaw.""" +from __future__ import annotations + +import importlib +import logging +import shutil +import sys +from dataclasses import dataclass, field +from enum import Enum +from typing import Optional + +from ..constant import WORKING_DIR, ACTIVE_SKILLS_DIR + +logger = logging.getLogger(__name__) + + +class HealthStatus(str, Enum): + """Health check status.""" + + HEALTHY = "healthy" + DEGRADED = "degraded" # Partial functionality + UNHEALTHY = "unhealthy" # Critical issues + + +@dataclass +class HealthCheckResult: + """Single health check result.""" + + name: str + status: HealthStatus + message: str + details: dict = field(default_factory=dict) + suggestion: str = "" + + +@dataclass +class SystemHealth: + """Overall system health.""" + + status: HealthStatus + checks: list[HealthCheckResult] + + @property + def healthy_count(self) -> int: + return sum(1 for c in self.checks if c.status == HealthStatus.HEALTHY) + + @property + def degraded_count(self) -> int: + return sum(1 for c in self.checks if c.status == HealthStatus.DEGRADED) + + @property + def unhealthy_count(self) -> int: + return sum( + 1 for c in self.checks if c.status == HealthStatus.UNHEALTHY + ) + + +class HealthChecker: + """Performs system health checks.""" + + def __init__(self): + self.results: list[HealthCheckResult] = [] + + def check_all(self) -> SystemHealth: + """Run all health checks (including LLM connection test). + + Each check is wrapped in exception handling to ensure one failing check + doesn't prevent other checks from running. + """ + self.results = [] + + self._safe_check("config_files", self.check_config_files) + self._safe_check( + "providers", + lambda: self.check_providers(test_connection=True), + ) + self._safe_check("skills", self.check_skills) + self._safe_check("dependencies", self.check_dependencies) + self._safe_check("environment", self.check_environment) + self._safe_check("disk_space", self.check_disk_space) + self._safe_check("channels", self.check_channels) + self._safe_check("mcp_clients", self.check_mcp_clients) + self._safe_check("required_files", self.check_required_files) + self._safe_check("permissions", self.check_permissions) + + if any(r.status == HealthStatus.UNHEALTHY for r in self.results): + overall = HealthStatus.UNHEALTHY + elif any(r.status == HealthStatus.DEGRADED for r in self.results): + overall = HealthStatus.DEGRADED + else: + overall = HealthStatus.HEALTHY + + return SystemHealth(status=overall, checks=self.results) + + def _safe_check(self, check_name: str, check_func) -> None: + """Run a health check with exception protection. + + If the check raises an unexpected exception, record it as UNHEALTHY + instead of aborting the entire health check process. + """ + try: + check_func() + except Exception as e: + logger.exception(f"Unexpected error in {check_name} check") + self._add_result( + check_name, + HealthStatus.UNHEALTHY, + f"Check failed with unexpected error: {type(e).__name__}: {e}", + suggestion=( + "This is an unexpected error. " + "Please report this issue with the error details." + ), + ) + + def _add_result( + self, + name: str, + status: HealthStatus, + message: str, + details: Optional[dict] = None, + suggestion: str = "", + ) -> None: + self.results.append( + HealthCheckResult( + name=name, + status=status, + message=message, + details=details or {}, + suggestion=suggestion, + ), + ) + + def check_config_files(self) -> None: + """Check if essential config files exist.""" + from .utils import get_config_path + + config_path = get_config_path() + + if not config_path.exists(): + self._add_result( + "config_files", + HealthStatus.UNHEALTHY, + f"config.json not found at {config_path}", + suggestion="Run 'copaw init' to create configuration.", + ) + return + + try: + from .utils import load_config + + load_config(config_path) + + self._add_result( + "config_files", + HealthStatus.HEALTHY, + "Configuration files are present", + details={"config_path": str(config_path)}, + ) + except Exception as e: + self._add_result( + "config_files", + HealthStatus.UNHEALTHY, + f"Failed to load config.json: {e}", + suggestion=( + "Check config.json syntax or run 'copaw init --force'." + ), + ) + + def check_providers(self, test_connection: bool = False) -> None: + """Check if LLM providers are configured. + + Args: + test_connection: If True, actually test the API connection. + """ + try: + from ..providers import ProviderManager + + manager = ProviderManager.get_instance() + active = manager.get_active_model() + + if not active or not active.provider_id or not active.model: + self._add_result( + "providers", + HealthStatus.UNHEALTHY, + "No active LLM configured", + suggestion="Run 'copaw models' to configure a model.", + ) + return + + provider = manager.get_provider(active.provider_id) + + if not provider: + self._add_result( + "providers", + HealthStatus.UNHEALTHY, + f"Active provider '{active.provider_id}' not found", + suggestion=( + "Run 'copaw models' to select a valid provider." + ), + ) + return + + if provider.require_api_key and not provider.api_key: + self._add_result( + "providers", + HealthStatus.UNHEALTHY, + f"Provider '{provider.name}' is not configured", + suggestion=( + f"Configure {provider.name} API key" + f" via 'copaw models'." + ), + ) + return + + if test_connection: + connection_ok = self._test_llm_connection() + if not connection_ok: + self._add_result( + "providers", + HealthStatus.DEGRADED, + ( + f"Provider configured but connection" + f" test failed: {provider.name}" + f" / {active.model}" + ), + details={ + "provider": active.provider_id, + "model": active.model, + }, + suggestion=( + "Check API key, network connection," + " and API endpoint availability." + ), + ) + return + + self._add_result( + "providers", + HealthStatus.HEALTHY, + f"Active LLM: {provider.name} / {active.model}" + + (" (connection verified)" if test_connection else ""), + details={ + "provider": active.provider_id, + "model": active.model, + "connection_tested": test_connection, + }, + ) + + except Exception as e: + self._add_result( + "providers", + HealthStatus.DEGRADED, + f"Failed to check providers: {e}", + ) + + def check_skills(self) -> None: + """Check if skills directory exists and has skills.""" + skills_dir = ACTIVE_SKILLS_DIR + + if not skills_dir.exists(): + self._add_result( + "skills", + HealthStatus.DEGRADED, + f"Active skills directory not found: {skills_dir}", + suggestion=( + "Run 'copaw init' or 'copaw skills config'" + " to enable skills." + ), + ) + return + + skill_count = sum( + 1 + for d in skills_dir.iterdir() + if d.is_dir() and (d / "SKILL.md").exists() + ) + + if skill_count == 0: + self._add_result( + "skills", + HealthStatus.DEGRADED, + "No skills are enabled", + suggestion="Run 'copaw skills config' to enable skills.", + ) + else: + self._add_result( + "skills", + HealthStatus.HEALTHY, + f"{skill_count} skill(s) enabled", + details={"count": skill_count, "path": str(skills_dir)}, + ) + + def check_dependencies(self) -> None: + """Check if required Python packages are installed.""" + required = [ + ("agentscope", "AgentScope framework"), + ("click", "CLI framework"), + ("pydantic", "Configuration validation"), + ] + + missing_required = [] + + for package, desc in required: + try: + importlib.import_module(package) + except ImportError: + missing_required.append(f"{package} ({desc})") + + if missing_required: + self._add_result( + "dependencies", + HealthStatus.UNHEALTHY, + f"Missing required packages: {', '.join(missing_required)}", + suggestion="Run 'pip install copaw' to install dependencies.", + ) + else: + self._add_result( + "dependencies", + HealthStatus.HEALTHY, + "All required dependencies are installed", + ) + + def check_environment(self) -> None: + """Check environment variables and system tools.""" + issues = [] + + py_version = sys.version_info + if py_version < (3, 10): + issues.append( + f"Python {py_version.major}.{py_version.minor} " + f"(requires >= 3.10)", + ) + + if issues: + self._add_result( + "environment", + HealthStatus.DEGRADED, + f"Environment issues: {'; '.join(issues)}", + suggestion="Upgrade Python to 3.10 or higher.", + ) + else: + self._add_result( + "environment", + HealthStatus.HEALTHY, + "Environment is properly configured", + details={ + "python_version": ( + f"{py_version.major}" + f".{py_version.minor}" + f".{py_version.micro}" + ), + "platform": sys.platform, + }, + ) + + def check_disk_space(self) -> None: + """Check available disk space in working directory.""" + try: + stat = shutil.disk_usage(WORKING_DIR) + free_gb = stat.free / (1024**3) + + if free_gb < 1.0: + status = HealthStatus.UNHEALTHY + message = f"Very low disk space: {free_gb:.1f} GB free" + suggestion = "Free up disk space to avoid issues." + elif free_gb < 5.0: + status = HealthStatus.DEGRADED + message = f"Low disk space: {free_gb:.1f} GB free" + suggestion = "Consider freeing up disk space." + else: + status = HealthStatus.HEALTHY + message = f"Sufficient disk space: {free_gb:.1f} GB free" + suggestion = "" + + self._add_result( + "disk_space", + status, + message, + details={"free_gb": round(free_gb, 2)}, + suggestion=suggestion, + ) + + except Exception as e: + self._add_result( + "disk_space", + HealthStatus.DEGRADED, + f"Failed to check disk space: {e}", + ) + + async def _async_test_llm_connection(self) -> bool: + """Async helper to test LLM connection.""" + try: + from ..agents.model_factory import create_model_and_formatter + + model_instance, _ = create_model_and_formatter() + await model_instance( + messages=[{"role": "user", "content": "test"}], + max_tokens=1, + ) + return True + + except ImportError as e: + logger.error(f"Failed to import model factory: {e}") + return False + except ValueError as e: + logger.error(f"Invalid model configuration: {e}") + return False + except ConnectionError as e: + logger.warning(f"Network connection failed: {e}") + return False + except Exception as e: + logger.warning(f"LLM connection test failed: {e}") + return False + + def _test_llm_connection(self) -> bool: + """Test LLM API connection with a simple request.""" + import asyncio + import concurrent.futures + + try: + asyncio.get_running_loop() + with concurrent.futures.ThreadPoolExecutor() as executor: + future = executor.submit( + asyncio.run, + self._async_test_llm_connection(), + ) + return future.result() + except RuntimeError: + return asyncio.run(self._async_test_llm_connection()) + + def check_channels(self) -> None: + """Check enabled channels configuration.""" + try: + from .config import ChannelConfig + from .utils import load_config + from .validator import ConfigValidator + + config = load_config() + enabled_channels = [ + name + for name in ChannelConfig.model_fields + if getattr( + getattr(config.channels, name, None), + "enabled", + False, + ) + ] + + result = ConfigValidator(config).validate_all() + channel_issues = [ + i for i in result.issues if i.path.startswith("channels.") + ] + + if not enabled_channels: + self._add_result( + "channels", + HealthStatus.DEGRADED, + "No channels are enabled", + suggestion=( + "Enable at least one channel via" + " 'copaw channels config'." + ), + ) + elif channel_issues: + issue_list = [ + f"{i.path.split('.')[-2]}: {i.message}" + for i in channel_issues + ] + n_ch = len(enabled_channels) + n_is = len(channel_issues) + self._add_result( + "channels", + HealthStatus.UNHEALTHY, + f"{n_ch} channel(s) enabled, but {n_is} have issues", + details={ + "enabled": enabled_channels, + "issues": issue_list, + }, + suggestion=( + "Fix channel credentials via 'copaw channels config'." + ), + ) + else: + n_ch = len(enabled_channels) + self._add_result( + "channels", + HealthStatus.HEALTHY, + f"{n_ch} channel(s) properly configured", + details={"enabled": enabled_channels}, + ) + + except Exception as e: + self._add_result( + "channels", + HealthStatus.DEGRADED, + f"Failed to check channels: {e}", + ) + + def check_mcp_clients(self) -> None: + """Check MCP client configurations.""" + try: + from .utils import load_config + from .validator import ConfigValidator + + config = load_config() + result = ConfigValidator(config).validate_all() + mcp_issues = [ + i for i in result.issues if i.path.startswith("mcp.") + ] + enabled_clients = [ + cid for cid, cfg in config.mcp.clients.items() if cfg.enabled + ] + + if not enabled_clients: + self._add_result( + "mcp_clients", + HealthStatus.HEALTHY, + "No MCP clients configured (optional)", + details={"enabled": 0}, + ) + elif mcp_issues: + n_c = len(enabled_clients) + n_i = len(mcp_issues) + self._add_result( + "mcp_clients", + HealthStatus.DEGRADED, + f"{n_c} MCP client(s) enabled, but {n_i} have issues", + details={ + "enabled": enabled_clients, + "issues": [ + f"{i.path}: {i.message}" for i in mcp_issues + ], + }, + suggestion=( + "Check MCP client configuration in config.json." + ), + ) + else: + n_c = len(enabled_clients) + self._add_result( + "mcp_clients", + HealthStatus.HEALTHY, + f"{n_c} MCP client(s) properly configured", + details={"enabled": enabled_clients}, + ) + + except Exception as e: + self._add_result( + "mcp_clients", + HealthStatus.DEGRADED, + f"Failed to check MCP clients: {e}", + ) + + def check_required_files(self) -> None: + """Check if required Markdown files exist.""" + from ..constant import HEARTBEAT_FILE + + required_files = { + "AGENTS.md": "Agent behavior configuration", + HEARTBEAT_FILE: "Heartbeat query template", + "MEMORY.md": "Memory management instructions", + "SOUL.md": "Agent personality and values", + } + + missing = [] + empty = [] + + for filename, description in required_files.items(): + file_path = WORKING_DIR / filename + if not file_path.exists(): + missing.append(f"{filename} ({description})") + elif file_path.stat().st_size == 0: + empty.append(f"{filename} ({description})") + + if missing: + names = ", ".join(f.split(" ")[0] for f in missing) + self._add_result( + "required_files", + HealthStatus.UNHEALTHY, + f"Missing {len(missing)} required file(s): {names}", + details={"missing": missing}, + suggestion="Run 'copaw init' to create missing files.", + ) + elif empty: + names = ", ".join(f.split(" ")[0] for f in empty) + self._add_result( + "required_files", + HealthStatus.DEGRADED, + f"{len(empty)} required file(s) are empty: {names}", + details={"empty": empty}, + suggestion="Edit these files to configure agent behavior.", + ) + else: + self._add_result( + "required_files", + HealthStatus.HEALTHY, + "All required files are present", + details={"files": list(required_files.keys())}, + ) + + def check_permissions(self) -> None: + """Check working directory permissions.""" + import os + + critical_dirs = { + "working_dir": WORKING_DIR, + "active_skills": ACTIVE_SKILLS_DIR, + "memory": WORKING_DIR / "memory", + "file_store": WORKING_DIR / "file_store", + } + + issues = [] + + for name, dir_path in critical_dirs.items(): + if not dir_path.exists(): + continue + if not os.access(dir_path, os.R_OK): + issues.append(f"{name}: not readable") + if not os.access(dir_path, os.W_OK): + issues.append(f"{name}: not writable") + + if issues: + self._add_result( + "permissions", + HealthStatus.UNHEALTHY, + f"Permission issues in {len(issues)} location(s)", + details={"issues": issues}, + suggestion=( + "Fix directory permissions with 'chmod'" + " or check file ownership." + ), + ) + else: + self._add_result( + "permissions", + HealthStatus.HEALTHY, + "All directories have proper permissions", + ) diff --git a/src/copaw/config/validator.py b/src/copaw/config/validator.py new file mode 100644 index 000000000..a29be7f7d --- /dev/null +++ b/src/copaw/config/validator.py @@ -0,0 +1,252 @@ +# -*- coding: utf-8 -*- +"""Configuration validation for CoPaw.""" +from __future__ import annotations + +import logging +import re +from dataclasses import dataclass +from enum import Enum +from typing import Optional + +from .config import Config, ChannelConfig + +logger = logging.getLogger(__name__) + + +class ValidationLevel(str, Enum): + """Validation severity levels.""" + + ERROR = "error" # Blocks execution + WARNING = "warning" # May cause issues + INFO = "info" # Informational + + +@dataclass +class ValidationIssue: + """Single validation issue.""" + + level: ValidationLevel + path: str # Config path like "channels.dingtalk.client_id" + message: str + suggestion: str # Fix suggestion + code: str # Error code like "CHANNEL_MISSING_CREDENTIALS" + + +@dataclass +class ValidationResult: + """Validation result container.""" + + valid: bool + issues: list[ValidationIssue] + + @property + def errors(self) -> list[ValidationIssue]: + return [i for i in self.issues if i.level == ValidationLevel.ERROR] + + @property + def warnings(self) -> list[ValidationIssue]: + return [i for i in self.issues if i.level == ValidationLevel.WARNING] + + @property + def infos(self) -> list[ValidationIssue]: + return [i for i in self.issues if i.level == ValidationLevel.INFO] + + +class ConfigValidator: + """Validates config.json structure and semantics.""" + + def __init__(self, config: Optional[Config] = None): + from .utils import load_config + + self.config = config or load_config() + self.issues: list[ValidationIssue] = [] + + def validate_all(self) -> ValidationResult: + """Run all validation checks.""" + self.issues = [] + + self._validate_channels() + self._validate_mcp() + self._validate_agents() + self._validate_heartbeat() + + has_errors = any(i.level == ValidationLevel.ERROR for i in self.issues) + return ValidationResult(valid=not has_errors, issues=self.issues) + + def _add_issue( + self, + level: ValidationLevel, + path: str, + message: str, + suggestion: str, + code: str, + ) -> None: + self.issues.append( + ValidationIssue( + level=level, + path=path, + message=message, + suggestion=suggestion, + code=code, + ), + ) + + def _validate_channels(self) -> None: + """Validate all channel configurations.""" + channels = self.config.channels + + enabled_channels = self._get_enabled_channels(channels) + if not enabled_channels: + self._add_issue( + ValidationLevel.WARNING, + "channels", + "No channels are enabled", + "Enable at least one channel " + "(console, dingtalk, feishu, etc.) " + "in config.json or run 'copaw init' to configure channels.", + "NO_CHANNELS_ENABLED", + ) + + if channels.dingtalk.enabled: + self._validate_dingtalk(channels.dingtalk) + if channels.feishu.enabled: + self._validate_feishu(channels.feishu) + if channels.qq.enabled: + self._validate_qq(channels.qq) + if channels.discord.enabled: + self._validate_discord(channels.discord) + if channels.telegram.enabled: + self._validate_telegram(channels.telegram) + + def _get_enabled_channels(self, channels: ChannelConfig) -> list[str]: + """Get list of enabled channel names.""" + enabled = [] + for name in ChannelConfig.model_fields: + channel = getattr(channels, name, None) + if channel and getattr(channel, "enabled", False): + enabled.append(name) + return enabled + + def _validate_dingtalk(self, config) -> None: + """Validate DingTalk channel configuration.""" + if not config.client_id or not config.client_secret: + self._add_issue( + ValidationLevel.ERROR, + "channels.dingtalk", + "DingTalk is enabled but missing credentials", + "Set 'client_id' and 'client_secret' in config.json under " + "channels.dingtalk, or run 'copaw channels config'.", + "DINGTALK_MISSING_CREDENTIALS", + ) + + def _validate_feishu(self, config) -> None: + """Validate Feishu channel configuration.""" + if not config.app_id or not config.app_secret: + self._add_issue( + ValidationLevel.ERROR, + "channels.feishu", + "Feishu is enabled but missing credentials", + "Set 'app_id' and 'app_secret' in config.json under " + "channels.feishu, or run 'copaw channels config'.", + "FEISHU_MISSING_CREDENTIALS", + ) + + def _validate_qq(self, config) -> None: + """Validate QQ channel configuration.""" + if not config.app_id or not config.client_secret: + self._add_issue( + ValidationLevel.ERROR, + "channels.qq", + "QQ is enabled but missing credentials", + "Set 'app_id' and 'client_secret' in config.json under " + "channels.qq, or run 'copaw channels config'.", + "QQ_MISSING_CREDENTIALS", + ) + + def _validate_discord(self, config) -> None: + """Validate Discord channel configuration.""" + if not config.bot_token: + self._add_issue( + ValidationLevel.ERROR, + "channels.discord", + "Discord is enabled but missing bot_token", + "Set 'bot_token' in config.json under channels.discord, " + "or run 'copaw channels config'.", + "DISCORD_MISSING_TOKEN", + ) + + def _validate_telegram(self, config) -> None: + """Validate Telegram channel configuration.""" + if not config.bot_token: + self._add_issue( + ValidationLevel.ERROR, + "channels.telegram", + "Telegram is enabled but missing bot_token", + "Set 'bot_token' in config.json under channels.telegram, " + "or run 'copaw channels config'.", + "TELEGRAM_MISSING_TOKEN", + ) + + def _validate_mcp(self) -> None: + """Validate MCP client configurations. + + Note: Transport-specific validation (stdio requires command, + http requires url) is already handled by Pydantic's model_validator + in MCPClientConfig. If load_config() succeeds, those constraints are + already satisfied. This method is kept for future semantic validations. + """ + + def _validate_agents(self) -> None: + """Validate agents configuration.""" + agents = self.config.agents + + if agents.running.max_iters < 1: + self._add_issue( + ValidationLevel.ERROR, + "agents.running.max_iters", + f"max_iters must be >= 1, got {agents.running.max_iters}", + "Set agents.running.max_iters to a positive integer" + " (default: 50).", + "AGENTS_INVALID_MAX_ITERS", + ) + + if agents.running.max_input_length < 1000: + self._add_issue( + ValidationLevel.WARNING, + "agents.running.max_input_length", + f"max_input_length is very small:" + f" {agents.running.max_input_length}", + "Consider increasing to at least 4096 tokens" + " for better context.", + "AGENTS_SMALL_INPUT_LENGTH", + ) + + def _validate_heartbeat(self) -> None: + """Validate heartbeat configuration.""" + hb = self.config.agents.defaults.heartbeat + if not hb or not hb.enabled: + return + + if not self._is_valid_interval(hb.every): + self._add_issue( + ValidationLevel.ERROR, + "agents.defaults.heartbeat.every", + f"Invalid interval format: {hb.every}", + "Use format like '30m', '1h', '2h30m'.", + "HEARTBEAT_INVALID_INTERVAL", + ) + + @staticmethod + def _is_valid_interval(interval: str) -> bool: + """Check if interval string is valid with proper unit ordering. + + Valid formats: '30m', '1h', '2h30m', '1d12h30m', etc. + Units must be in descending order: + d (days) -> h (hours) -> m (minutes) -> s (seconds) + """ + pattern = r"^(?:(\d+)d)?(?:(\d+)h)?(?:(\d+)m)?(?:(\d+)s)?$" + match = re.match(pattern, interval) + if not match: + return False + # Ensure at least one unit is present (not all None) + return any(match.groups()) From 0e0e3a57d7311e87843f764e44ad2a3ac395d7a3 Mon Sep 17 00:00:00 2001 From: fengmo Date: Mon, 16 Mar 2026 14:36:31 +0800 Subject: [PATCH 2/4] feat(cli): add copaw health command for system diagnostics Add `copaw health` command that runs comprehensive health checks and configuration validation with human-readable and JSON output. Integrate health check into `copaw init` post-initialization step so users get immediate feedback on their setup. Options: --verbose / -v Show detailed check information --json Output results in JSON format --- src/copaw/cli/health_cmd.py | 187 ++++++++++++++++++++++++++++++++++++ src/copaw/cli/init_cmd.py | 52 ++++++++++ src/copaw/cli/main.py | 6 ++ src/copaw/config/utils.py | 10 +- 4 files changed, 253 insertions(+), 2 deletions(-) create mode 100644 src/copaw/cli/health_cmd.py diff --git a/src/copaw/cli/health_cmd.py b/src/copaw/cli/health_cmd.py new file mode 100644 index 000000000..afa67ae93 --- /dev/null +++ b/src/copaw/cli/health_cmd.py @@ -0,0 +1,187 @@ +# -*- coding: utf-8 -*- +"""CLI health command: comprehensive system health check and diagnostics.""" +import json +import click + +from ..config.validator import ConfigValidator +from ..config.health import HealthChecker, HealthStatus + + +def _print_checks(health, verbose, status_icons, status_colors) -> None: + """Print individual health check results.""" + for check in health.checks: + icon = status_icons[check.status] + color = status_colors[check.status] + click.secho(f"{icon} {check.name}: ", fg=color, nl=False) + click.echo(check.message) + + if verbose and check.details: + for key, value in check.details.items(): + click.echo(f" {key}: {value}") + + if check.suggestion: + click.secho(f" → {check.suggestion}", fg="cyan") + + +def _print_validation(validation) -> None: + """Print configuration validation errors and warnings.""" + if not (validation.errors or validation.warnings): + return + + click.echo("\n" + "=" * 60) + click.echo("Configuration Validation") + click.echo("=" * 60 + "\n") + + if validation.errors: + n = len(validation.errors) + click.secho(f"✗ Found {n} error(s):", fg="red", bold=True) + for issue in validation.errors: + click.secho(f"\n {issue.path}", fg="red", bold=True) + click.echo(f" {issue.message}") + click.secho(f" → {issue.suggestion}", fg="cyan") + + if validation.warnings: + n = len(validation.warnings) + click.secho(f"\n⚠ Found {n} warning(s):", fg="yellow", bold=True) + for issue in validation.warnings: + click.secho(f"\n {issue.path}", fg="yellow", bold=True) + click.echo(f" {issue.message}") + if issue.suggestion: + click.secho(f" → {issue.suggestion}", fg="cyan") + + +@click.command("health") +@click.option( + "--json", + "output_json", + is_flag=True, + help="Output results in JSON format.", +) +@click.option( + "--verbose", + "-v", + is_flag=True, + help="Show detailed information.", +) +def health_cmd(output_json: bool, verbose: bool) -> None: + """Run comprehensive system health check and configuration validation. + + This command performs: + 1. System health checks (10 checks): + - Configuration files + - LLM providers and connection test + - Skills availability + - Python dependencies + - Environment and system tools + - Disk space + - Channel credentials + - MCP clients + - Required files + - Directory permissions + + 2. Configuration validation: + - Semantic validation of config.json + - Channel configuration checks + - MCP client validation + - Agent settings validation + """ + if not output_json: + click.echo("\n🐾 CoPaw System Health Check\n") + + # Run health checks + checker = HealthChecker() + health = checker.check_all() + + # Run configuration validation + validator = ConfigValidator() + validation = validator.validate_all() + + # JSON output + if output_json: + result = { + "health": { + "status": health.status.value, + "summary": { + "healthy": health.healthy_count, + "degraded": health.degraded_count, + "unhealthy": health.unhealthy_count, + }, + "checks": [ + { + "name": check.name, + "status": check.status.value, + "message": check.message, + "details": check.details, + "suggestion": check.suggestion, + } + for check in health.checks + ], + }, + "validation": { + "valid": validation.valid, + "error_count": len(validation.errors), + "warning_count": len(validation.warnings), + "issues": [ + { + "level": issue.level.value, + "path": issue.path, + "message": issue.message, + "suggestion": issue.suggestion, + "code": issue.code, + } + for issue in validation.issues + ], + }, + } + click.echo(json.dumps(result, indent=2, ensure_ascii=False)) + return + + # Human-readable output + status_icons = { + HealthStatus.HEALTHY: "✓", + HealthStatus.DEGRADED: "⚠", + HealthStatus.UNHEALTHY: "✗", + } + status_colors = { + HealthStatus.HEALTHY: "green", + HealthStatus.DEGRADED: "yellow", + HealthStatus.UNHEALTHY: "red", + } + + _print_checks(health, verbose, status_icons, status_colors) + _print_validation(validation) + + # Overall summary + click.echo("\n" + "=" * 60) + + has_critical = ( + health.status == HealthStatus.UNHEALTHY or not validation.valid + ) + has_warnings = health.status == HealthStatus.DEGRADED or bool( + validation.warnings, + ) + + if has_critical: + click.secho("✗ System has critical issues", fg="red", bold=True) + click.echo( + f" Health: {health.unhealthy_count} critical," + f" {health.degraded_count} warnings", + ) + if not validation.valid: + click.echo( + f" Config: {len(validation.errors)} errors," + f" {len(validation.warnings)} warnings", + ) + elif validation.warnings: + click.echo(f" Config: {len(validation.warnings)} warnings") + elif has_warnings: + click.secho( + "⚠ System is operational with warnings", + fg="yellow", + bold=True, + ) + click.echo(f" Health: {health.degraded_count} warnings") + if validation.warnings: + click.echo(f" Config: {len(validation.warnings)} warnings") + else: + click.secho("✓ All checks passed!", fg="green", bold=True) diff --git a/src/copaw/cli/init_cmd.py b/src/copaw/cli/init_cmd.py index ace9dca3a..f52edddb1 100644 --- a/src/copaw/cli/init_cmd.py +++ b/src/copaw/cli/init_cmd.py @@ -427,3 +427,55 @@ def init_cmd( click.echo(f"✓ Heartbeat query saved to {heartbeat_path}") click.echo("\n✓ Initialization complete!") + + # Run health check after initialization + click.echo("\n" + "=" * 60) + click.echo("Running system health check...") + click.echo("=" * 60) + + from ..config.health import HealthChecker, HealthStatus + + checker = HealthChecker() + health = checker.check_all() + + # Display results + status_icons = { + HealthStatus.HEALTHY: "✓", + HealthStatus.DEGRADED: "⚠", + HealthStatus.UNHEALTHY: "✗", + } + + for check in health.checks: + icon = status_icons[check.status] + click.echo(f"{icon} {check.name}: {check.message}") + if check.suggestion: + click.secho(f" → {check.suggestion}", fg="cyan") + + click.echo() + if health.status == HealthStatus.HEALTHY: + click.secho( + "✓ All checks passed! You're ready to use CoPaw.", + fg="green", + bold=True, + ) + click.echo("\nNext steps:") + click.echo(" • Run 'copaw app' to start the web console") + click.echo(" • Visit http://localhost:8088 in your browser") + elif health.status == HealthStatus.DEGRADED: + click.secho( + f"⚠ System is degraded ({health.degraded_count} warnings)", + fg="yellow", + bold=True, + ) + click.echo( + "\nYou can still use CoPaw, but some features may not work.", + ) + click.echo("Run 'copaw health' anytime to check system status.") + else: + click.secho( + f"✗ System has critical issues ({health.unhealthy_count} errors)", + fg="red", + bold=True, + ) + click.echo("\nPlease fix the errors above before using CoPaw.") + click.echo("Run 'copaw health' to check status after fixing.") diff --git a/src/copaw/cli/main.py b/src/copaw/cli/main.py index 9491f5c13..c933b6fb3 100644 --- a/src/copaw/cli/main.py +++ b/src/copaw/cli/main.py @@ -71,6 +71,11 @@ def _record(label: str, elapsed: float) -> None: _record(".cron_cmd", time.perf_counter() - _t) +_t = time.perf_counter() +from .health_cmd import health_cmd # noqa: E402 + +_record(".health_cmd", time.perf_counter() - _t) + _t = time.perf_counter() from .env_cmd import env_group # noqa: E402 @@ -157,6 +162,7 @@ def cli(ctx: click.Context, host: str | None, port: int | None) -> None: cli.add_command(clean_cmd) cli.add_command(cron_group) cli.add_command(env_group) +cli.add_command(health_cmd) cli.add_command(init_cmd) cli.add_command(models_group) cli.add_command(skills_group) diff --git a/src/copaw/config/utils.py b/src/copaw/config/utils.py index e2fb3fe97..499b7930e 100644 --- a/src/copaw/config/utils.py +++ b/src/copaw/config/utils.py @@ -17,7 +17,12 @@ RUNNING_IN_CONTAINER, WORKING_DIR, ) -from .config import Config, HeartbeatConfig, LastApiConfig, LastDispatchConfig +from .config import ( + Config, + HeartbeatConfig, + LastApiConfig, + LastDispatchConfig, +) def _discover_system_chromium_path() -> Optional[str]: @@ -343,7 +348,8 @@ def load_config(config_path: Optional[Path] = None) -> Config: la["host"] = data.get("last_api_host") if "port" not in la and "last_api_port" in data: la["port"] = data.get("last_api_port") - return Config.model_validate(data) + config = Config.model_validate(data) + return config def save_config(config: Config, config_path: Optional[Path] = None) -> None: From 11a6f810fe2d0a9dc54b0a84c541c15a4ea7dbb1 Mon Sep 17 00:00:00 2001 From: fengmo Date: Mon, 16 Mar 2026 14:49:11 +0800 Subject: [PATCH 3/4] fix: address code review findings in health command - Add logger.exception() to all broad except blocks in health.py so full tracebacks are preserved in logs (config_files, providers, channels, mcp_clients checks) - Change check_providers/channels/mcp_clients failure status from DEGRADED to UNHEALTHY for more accurate severity reporting - Narrow check_disk_space to catch OSError only instead of Exception - Add timeout=30 to future.result() in _test_llm_connection to prevent indefinite hang when LLM endpoint is unreachable - Wrap ConfigValidator() construction in try/except to show user-friendly error instead of raw Python traceback - Add if-guard for issue.suggestion in errors display branch - Remove redundant variable assignment in utils.load_config --- src/copaw/cli/health_cmd.py | 16 +++++++++++++--- src/copaw/config/health.py | 35 +++++++++++++++++++++++++---------- src/copaw/config/utils.py | 3 +-- 3 files changed, 39 insertions(+), 15 deletions(-) diff --git a/src/copaw/cli/health_cmd.py b/src/copaw/cli/health_cmd.py index afa67ae93..7a7a6be1d 100644 --- a/src/copaw/cli/health_cmd.py +++ b/src/copaw/cli/health_cmd.py @@ -38,7 +38,8 @@ def _print_validation(validation) -> None: for issue in validation.errors: click.secho(f"\n {issue.path}", fg="red", bold=True) click.echo(f" {issue.message}") - click.secho(f" → {issue.suggestion}", fg="cyan") + if issue.suggestion: + click.secho(f" → {issue.suggestion}", fg="cyan") if validation.warnings: n = len(validation.warnings) @@ -93,8 +94,17 @@ def health_cmd(output_json: bool, verbose: bool) -> None: health = checker.check_all() # Run configuration validation - validator = ConfigValidator() - validation = validator.validate_all() + try: + validator = ConfigValidator() + validation = validator.validate_all() + except Exception as e: + if not output_json: + click.secho( + f"Configuration validation failed: {e}", + fg="red", + ) + click.echo("Run 'copaw health' again or check config.json.") + return # JSON output if output_json: diff --git a/src/copaw/config/health.py b/src/copaw/config/health.py index ac562de4c..0293fb4c2 100644 --- a/src/copaw/config/health.py +++ b/src/copaw/config/health.py @@ -158,10 +158,11 @@ def check_config_files(self) -> None: details={"config_path": str(config_path)}, ) except Exception as e: + logger.exception("Unexpected error while checking config files") self._add_result( "config_files", HealthStatus.UNHEALTHY, - f"Failed to load config.json: {e}", + f"Failed to load config.json: {type(e).__name__}: {e}", suggestion=( "Check config.json syntax or run 'copaw init --force'." ), @@ -248,10 +249,14 @@ def check_providers(self, test_connection: bool = False) -> None: ) except Exception as e: + logger.exception("Unexpected error while checking providers") self._add_result( "providers", - HealthStatus.DEGRADED, - f"Failed to check providers: {e}", + HealthStatus.UNHEALTHY, + f"Failed to check providers: {type(e).__name__}: {e}", + suggestion=( + "Run 'copaw health --verbose' and check logs for details." + ), ) def check_skills(self) -> None: @@ -381,11 +386,13 @@ def check_disk_space(self) -> None: suggestion=suggestion, ) - except Exception as e: + except OSError as e: + logger.exception("OS error while checking disk space") self._add_result( "disk_space", - HealthStatus.DEGRADED, + HealthStatus.UNHEALTHY, f"Failed to check disk space: {e}", + suggestion="Check filesystem mount and permissions.", ) async def _async_test_llm_connection(self) -> bool: @@ -425,7 +432,7 @@ def _test_llm_connection(self) -> bool: asyncio.run, self._async_test_llm_connection(), ) - return future.result() + return future.result(timeout=30) except RuntimeError: return asyncio.run(self._async_test_llm_connection()) @@ -491,10 +498,14 @@ def check_channels(self) -> None: ) except Exception as e: + logger.exception("Unexpected error while checking channels") self._add_result( "channels", - HealthStatus.DEGRADED, - f"Failed to check channels: {e}", + HealthStatus.UNHEALTHY, + f"Failed to check channels: {type(e).__name__}: {e}", + suggestion=( + "Run 'copaw health --verbose' and check logs for details." + ), ) def check_mcp_clients(self) -> None: @@ -546,10 +557,14 @@ def check_mcp_clients(self) -> None: ) except Exception as e: + logger.exception("Unexpected error while checking MCP clients") self._add_result( "mcp_clients", - HealthStatus.DEGRADED, - f"Failed to check MCP clients: {e}", + HealthStatus.UNHEALTHY, + f"Failed to check MCP clients: {type(e).__name__}: {e}", + suggestion=( + "Run 'copaw health --verbose' and check logs for details." + ), ) def check_required_files(self) -> None: diff --git a/src/copaw/config/utils.py b/src/copaw/config/utils.py index 499b7930e..0256d1626 100644 --- a/src/copaw/config/utils.py +++ b/src/copaw/config/utils.py @@ -348,8 +348,7 @@ def load_config(config_path: Optional[Path] = None) -> Config: la["host"] = data.get("last_api_host") if "port" not in la and "last_api_port" in data: la["port"] = data.get("last_api_port") - config = Config.model_validate(data) - return config + return Config.model_validate(data) def save_config(config: Config, config_path: Optional[Path] = None) -> None: From dbf461a85d3793ab8cf86fca633205fbac18045d Mon Sep 17 00:00:00 2001 From: fengmo Date: Mon, 16 Mar 2026 15:02:33 +0800 Subject: [PATCH 4/4] fix: add type hints to _print_checks and _print_validation Add explicit type annotations to the two private helper functions in health_cmd.py for improved readability and static analysis. Co-Authored-By: Claude Sonnet 4.6 --- src/copaw/cli/health_cmd.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/copaw/cli/health_cmd.py b/src/copaw/cli/health_cmd.py index 7a7a6be1d..7eb531b37 100644 --- a/src/copaw/cli/health_cmd.py +++ b/src/copaw/cli/health_cmd.py @@ -3,11 +3,16 @@ import json import click -from ..config.validator import ConfigValidator -from ..config.health import HealthChecker, HealthStatus +from ..config.validator import ConfigValidator, ValidationResult +from ..config.health import HealthChecker, HealthStatus, SystemHealth -def _print_checks(health, verbose, status_icons, status_colors) -> None: +def _print_checks( + health: SystemHealth, + verbose: bool, + status_icons: dict[HealthStatus, str], + status_colors: dict[HealthStatus, str], +) -> None: """Print individual health check results.""" for check in health.checks: icon = status_icons[check.status] @@ -23,7 +28,7 @@ def _print_checks(health, verbose, status_icons, status_colors) -> None: click.secho(f" → {check.suggestion}", fg="cyan") -def _print_validation(validation) -> None: +def _print_validation(validation: ValidationResult) -> None: """Print configuration validation errors and warnings.""" if not (validation.errors or validation.warnings): return