diff --git a/.idea/workspace.xml b/.idea/workspace.xml
index 536d516..d4667b2 100644
--- a/.idea/workspace.xml
+++ b/.idea/workspace.xml
@@ -4,7 +4,14 @@
-
+
+
+
+
+
+
+
+
@@ -48,7 +55,7 @@
"Python tests.Python tests for threshold.TestThreshold.test_alert_definition_create_violation.executor": "Run",
"Python tests.Python tests for threshold.TestThreshold.test_metric_data_to_dict.executor": "Run",
"RunOnceActivity.ShowReadmeOnStart": "true",
- "git-widget-placeholder": "main",
+ "git-widget-placeholder": "7-add-cli-functionality",
"last_opened_file_path": "/Users/rakibulhaq/Projects/packages/python/pysentinel/assets"
}
}]]>
@@ -61,7 +68,7 @@
-
+
diff --git a/README.md b/README.md
index d30ba6c..9159c37 100644
--- a/README.md
+++ b/README.md
@@ -149,6 +149,220 @@ async def root():
```
**This example shows how to integrate pysentinel with FastAPI, starting the scanner in the background when the application starts.**
+## CLI Installation & Usage
+
+### Install from PyPI (Recommended)
+
+```bash
+
+
+```bash
+# Install PySentinel with CLI support
+pip install pysentinel
+
+# Or using Poetry
+poetry add pysentinel
+```
+
+After installation, the `pysentinel` command will be available in your terminal.
+
+## CLI Usage
+
+PySentinel provides a command-line interface for running the scanner with configuration files.
+
+### Basic Usage
+
+```bash
+# Run scanner synchronously (blocking)
+pysentinel config.yml
+
+# Run scanner asynchronously (non-blocking)
+pysentinel config.yml --async
+
+# Use JSON configuration
+pysentinel /path/to/config.json
+
+# Show help
+pysentinel --help
+
+# Show version
+pysentinel --version
+```
+
+### Configuration File
+
+Create a YAML or JSON configuration file:
+
+**Example `config.yml`:**
+```yaml
+scanner:
+ interval: 30
+ timeout: 10
+
+alerts:
+ email:
+ enabled: true
+ smtp_server: "smtp.example.com"
+ recipients:
+ - "admin@example.com"
+
+thresholds:
+ cpu_usage: 80
+ memory_usage: 85
+```
+
+### CLI Examples
+
+```bash
+# Start monitoring with 30-second intervals
+pysentinel production-config.yml
+
+# Run in background mode (async)
+pysentinel monitoring.yml --async
+
+# Use absolute path to config
+pysentinel /etc/pysentinel/config.yml
+
+# Quick help
+pysentinel -h
+```
+
+### Exit Codes
+
+- `0` - Success or user interrupted (Ctrl+C)
+- `1` - Configuration or scanner error
+
+## Docker Usage
+
+### Running PySentinel CLI in Docker
+
+You can run PySentinel inside a Docker container for isolated execution and easy deployment.
+
+**Create a Dockerfile:**
+
+```dockerfile
+FROM python:3.11-slim
+
+# Install PySentinel
+RUN pip install pysentinel
+
+# Create app directory
+WORKDIR /app
+
+# Copy configuration file
+COPY config.yml /app/config.yml
+
+# Run PySentinel CLI
+CMD ["pysentinel", "config.yml"]
+```
+### Build and Run the Docker Container
+
+```bash
+# Build the Docker image
+docker build -t pysentinel-app .
+
+# Run synchronously
+docker run --rm pysentinel-app
+
+# Run asynchronously
+docker run --rm pysentinel-app pysentinel config.yml --async
+
+# Mount external config file
+docker run --rm -v /path/to/your/config.yml:/app/config.yml pysentinel-app
+
+# Run with environment variables for database connections
+docker run --rm \
+ -e DB_HOST=host.docker.internal \
+ -e DB_PORT=5432 \
+ -v /path/to/config.yml:/app/config.yml \
+ pysentinel-app
+```
+
+### Docker Compose Example
+create a `docker-compose.yml` file to run PySentinel with a PostgreSQL database:
+
+```yaml
+version: '3.8'
+
+services:
+ pysentinel:
+ image: python:3.11-slim
+ command: >
+ sh -c "pip install pysentinel &&
+ pysentinel /app/config.yml --async"
+ volumes:
+ - ./config.yml:/app/config.yml
+ - ./logs:/app/logs
+ environment:
+ - DB_HOST=postgres
+ - DB_USER=sentinel_user
+ - DB_PASSWORD=sentinel_pass
+ depends_on:
+ - postgres
+ restart: unless-stopped
+
+ postgres:
+ image: postgres:15
+ environment:
+ POSTGRES_DB: monitoring
+ POSTGRES_USER: sentinel_user
+ POSTGRES_PASSWORD: sentinel_pass
+ volumes:
+ - postgres_data:/var/lib/postgresql/data
+
+volumes:
+ postgres_data:
+```
+This `docker-compose.yml` sets up a PySentinel service that connects to a PostgreSQL database, allowing you to run the scanner with persistent data storage.
+
+### Run with Docker Compose:
+
+```bash
+# Start the monitoring stack
+docker-compose up -d
+
+# View logs
+docker-compose logs pysentinel
+
+# Stop the stack
+docker-compose down
+```
+
+### Production Docker Setup
+Multi-sage Dockerfile for production use:
+
+```dockerfile
+FROM python:3.11-slim as builder
+
+# Install dependencies
+RUN pip install --no-cache-dir pysentinel
+
+FROM python:3.11-slim
+
+# Copy installed packages
+COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
+COPY --from=builder /usr/local/bin/pysentinel /usr/local/bin/pysentinel
+
+# Create non-root user
+RUN useradd --create-home --shell /bin/bash sentinel
+
+# Set working directory
+WORKDIR /app
+
+# Change ownership
+RUN chown -R sentinel:sentinel /app
+
+# Switch to non-root user
+USER sentinel
+
+# Health check
+HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
+ CMD pysentinel --version || exit 1
+
+# Default command
+CMD ["pysentinel", "config.yml", "--async"]
+```
+
## Configuration
Here’s how to use the `load_config()` function from `pysentinel.config.loader` to load your YAML config and start the scanner.
This approach works for both YAML and JSON config files.
diff --git a/pyproject.toml b/pyproject.toml
index 3bce2c5..fc73b9f 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pysentinel"
-version = "0.1.3"
+version = "0.1.4"
description = "A python package for threshold based alerting using simple configuration."
authors = [
"Rakibul Haq ",
@@ -26,3 +26,6 @@ pytest-asyncio = "^1.0.0"
[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"
+
+[tool.poetry.scripts]
+pysentinel = "pysentinel.cli.cli:main"
diff --git a/pysentinel/channels/__init__.py b/pysentinel/channels/__init__.py
index f02b021..737c82c 100644
--- a/pysentinel/channels/__init__.py
+++ b/pysentinel/channels/__init__.py
@@ -1,4 +1,4 @@
from .email import Email
from .telegram import Telegram
from .slack import Slack
-from .webhook import Webhook
\ No newline at end of file
+from .webhook import Webhook
diff --git a/pysentinel/channels/email.py b/pysentinel/channels/email.py
index ab3d175..76af964 100644
--- a/pysentinel/channels/email.py
+++ b/pysentinel/channels/email.py
@@ -13,9 +13,11 @@ class Email(AlertChannel):
async def send_alert(self, violation: Violation) -> bool:
try:
msg = MIMEMultipart()
- msg['From'] = self.config['from_address']
- msg['To'] = ', '.join(self.config['recipients'])
- msg['Subject'] = self.config['subject_template'].format(alert_title=violation.alert_name)
+ msg["From"] = self.config["from_address"]
+ msg["To"] = ", ".join(self.config["recipients"])
+ msg["Subject"] = self.config["subject_template"].format(
+ alert_title=violation.alert_name
+ )
body = f"""
Alert: {violation.alert_name}
@@ -27,18 +29,20 @@ async def send_alert(self, violation: Violation) -> bool:
Time: {violation.timestamp}
"""
- msg.attach(MIMEText(body, 'plain'))
+ msg.attach(MIMEText(body, "plain"))
- password = self.config['password']
- if password.startswith('${') and password.endswith('}'):
+ password = self.config["password"]
+ if password.startswith("${") and password.endswith("}"):
env_var = password[2:-1]
password = os.getenv(env_var, password)
- server = smtplib.SMTP(self.config['smtp_server'], self.config['smtp_port'])
+ server = smtplib.SMTP(self.config["smtp_server"], self.config["smtp_port"])
server.starttls()
- server.login(self.config['username'], password)
+ server.login(self.config["username"], password)
text = msg.as_string()
- server.sendmail(self.config['from_address'], self.config['recipients'], text)
+ server.sendmail(
+ self.config["from_address"], self.config["recipients"], text
+ )
server.quit()
return True
diff --git a/pysentinel/channels/slack.py b/pysentinel/channels/slack.py
index 760f4fb..fe42b38 100644
--- a/pysentinel/channels/slack.py
+++ b/pysentinel/channels/slack.py
@@ -11,50 +11,60 @@ async def send_alert(self, violation: Violation) -> bool:
try:
payload = {
- "channel": self.config['channel'],
- "username": self.config['username'],
- "icon_emoji": self.config['icon_emoji'],
+ "channel": self.config["channel"],
+ "username": self.config["username"],
+ "icon_emoji": self.config["icon_emoji"],
"text": f"🚨 *{violation.severity.value.upper()}* Alert: {violation.alert_name}",
"attachments": [
{
- "color": "danger" if violation.severity == Severity.CRITICAL else "warning",
+ "color": (
+ "danger"
+ if violation.severity == Severity.CRITICAL
+ else "warning"
+ ),
"fields": [
{
"title": "Message",
"value": violation.message,
- "short": False
+ "short": False,
},
{
"title": "Current Value",
"value": str(violation.current_value),
- "short": True
+ "short": True,
},
{
"title": "Threshold",
"value": f"{violation.operator} {violation.threshold_value}",
- "short": True
+ "short": True,
},
{
"title": "Datasource",
"value": violation.datasource_name,
- "short": True
+ "short": True,
},
{
"title": "Time",
- "value": violation.timestamp.strftime("%Y-%m-%d %H:%M:%S UTC"),
- "short": True
- }
- ]
+ "value": violation.timestamp.strftime(
+ "%Y-%m-%d %H:%M:%S UTC"
+ ),
+ "short": True,
+ },
+ ],
}
- ]
+ ],
}
# Add mentions if configured
- if 'mention_users' in self.config:
- payload['text'] = f"{' '.join(self.config['mention_users'])} {payload['text']}"
+ if "mention_users" in self.config:
+ payload["text"] = (
+ f"{' '.join(self.config['mention_users'])} {payload['text']}"
+ )
async with aiohttp.ClientSession() as session:
- async with session.post(self.config['webhook_url'], json=payload) as response:
+ async with session.post(
+ self.config["webhook_url"], json=payload
+ ) as response:
return response.status == 200
except Exception as e:
logger.error(f"Failed to send Slack alert: {e}")
diff --git a/pysentinel/channels/telegram.py b/pysentinel/channels/telegram.py
index 7bcad88..1313791 100644
--- a/pysentinel/channels/telegram.py
+++ b/pysentinel/channels/telegram.py
@@ -10,18 +10,20 @@ async def send_alert(self, violation: Violation) -> bool:
try:
payload = {
- "chat_id": self.config['chat_id'],
+ "chat_id": self.config["chat_id"],
"text": f"🚨 *{violation.severity.value.upper()}* Alert: {violation.alert_name}\n"
- f"Message: {violation.message}\n"
- f"Current Value: {violation.current_value}\n"
- f"Threshold: {violation.operator} {violation.threshold_value}\n"
- f"Datasource: {violation.datasource_name}\n"
- f"Time: {violation.timestamp.strftime('%Y-%m-%d %H:%M:%S UTC')}",
+ f"Message: {violation.message}\n"
+ f"Current Value: {violation.current_value}\n"
+ f"Threshold: {violation.operator} {violation.threshold_value}\n"
+ f"Datasource: {violation.datasource_name}\n"
+ f"Time: {violation.timestamp.strftime('%Y-%m-%d %H:%M:%S UTC')}",
"parse_mode": "Markdown",
}
async with aiohttp.ClientSession() as session:
- async with session.post(self.config['webhook_url'], json=payload) as response:
+ async with session.post(
+ self.config["webhook_url"], json=payload
+ ) as response:
return response.status == 200
except Exception as e:
logger.error(f"Failed to send Telegram alert: {e}")
diff --git a/pysentinel/channels/webhook.py b/pysentinel/channels/webhook.py
index aa77d4f..c565b46 100644
--- a/pysentinel/channels/webhook.py
+++ b/pysentinel/channels/webhook.py
@@ -12,28 +12,32 @@ async def send_alert(self, violation: Violation) -> bool:
import aiohttp
try:
- headers = self.config.get('headers', {})
+ headers = self.config.get("headers", {})
# Replace environment variables in headers
for key, value in headers.items():
- if isinstance(value, str) and value.startswith('${') and value.endswith('}'):
+ if (
+ isinstance(value, str)
+ and value.startswith("${")
+ and value.endswith("}")
+ ):
env_var = value[2:-1]
headers[key] = os.getenv(env_var, value)
payload = violation.to_dict()
async with aiohttp.ClientSession() as session:
- for attempt in range(self.config.get('retry_count', 1)):
+ for attempt in range(self.config.get("retry_count", 1)):
try:
async with session.request(
- self.config.get('method', 'POST'),
- self.config['url'],
- json=payload,
- headers=headers
+ self.config.get("method", "POST"),
+ self.config["url"],
+ json=payload,
+ headers=headers,
) as response:
if response.status < 400:
return True
except Exception as e:
- if attempt == self.config.get('retry_count', 1) - 1:
+ if attempt == self.config.get("retry_count", 1) - 1:
raise e
await asyncio.sleep(1)
diff --git a/pysentinel/cli/cli.py b/pysentinel/cli/cli.py
new file mode 100644
index 0000000..a6ac9dd
--- /dev/null
+++ b/pysentinel/cli/cli.py
@@ -0,0 +1,95 @@
+#!/usr/bin/env python3
+"""
+PySentinel CLI - Command line interface for running the scanner
+"""
+import argparse
+import asyncio
+import sys
+from pathlib import Path
+
+from pysentinel.core.scanner import Scanner
+from pysentinel.config.loader import load_config
+
+
+def start_scanner_sync(config_path: str) -> None:
+ """Start the scanner synchronously (blocking)"""
+ try:
+ config = load_config(config_path)
+ scanner = Scanner(config)
+ print(f"Starting PySentinel scanner with config: {config_path}")
+ scanner.start()
+ except KeyboardInterrupt:
+ print("\nScanner stopped by user")
+ sys.exit(0)
+ except Exception as e:
+ print(f"Error starting scanner: {e}")
+ sys.exit(1)
+
+
+async def start_scanner_async(config_path: str) -> None:
+ """Start the scanner asynchronously"""
+ try:
+ config = load_config(config_path)
+ scanner = Scanner(config)
+ print(f"Starting PySentinel scanner (async) with config: {config_path}")
+ await scanner.start_async()
+ except KeyboardInterrupt:
+ print("\nScanner stopped by user")
+ sys.exit(0)
+ except Exception as e:
+ print(f"Error starting scanner: {e}")
+ sys.exit(1)
+
+
+def validate_config_file(config_path: str) -> str:
+ """Validate that the config file exists"""
+ path = Path(config_path)
+ if not path.exists():
+ raise argparse.ArgumentTypeError(f"Config file '{config_path}' does not exist")
+ if not path.is_file():
+ raise argparse.ArgumentTypeError(f"'{config_path}' is not a file")
+ return str(path)
+
+
+def main() -> None:
+ """Main CLI entry point"""
+ parser = argparse.ArgumentParser(
+ description="PySentinel - Threshold-based alerting scanner",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog="""
+Examples:
+ pysentinel config.yml # Run synchronously
+ pysentinel config.yml --async # Run asynchronously
+ pysentinel /path/to/config.json # Use JSON config
+ """,
+ )
+
+ parser.add_argument(
+ "config",
+ type=validate_config_file,
+ help="Path to configuration file (YAML or JSON)",
+ )
+
+ parser.add_argument(
+ "--async",
+ dest="run_async",
+ action="store_true",
+ help="Run scanner asynchronously (non-blocking)",
+ )
+
+ parser.add_argument("--version", action="version", version="PySentinel CLI 0.1.0")
+
+ # Additional validation for async mode
+ if len(sys.argv) > 1 and "--async" in sys.argv and sys.version_info < (3, 7):
+ parser.error("Asynchronous mode requires Python 3.7 or higher")
+
+ args = parser.parse_args()
+
+ if args.run_async:
+ asyncio.run(start_scanner_async(args.config))
+ else:
+ start_scanner_sync(args.config)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/pysentinel/cli/commands.py b/pysentinel/cli/commands.py
deleted file mode 100644
index e69de29..0000000
diff --git a/pysentinel/config/loader.py b/pysentinel/config/loader.py
index fdc5ceb..b6412d5 100644
--- a/pysentinel/config/loader.py
+++ b/pysentinel/config/loader.py
@@ -13,8 +13,8 @@ def load_config(config: Union[str, Dict]):
_config = None
try:
if isinstance(config, str):
- with open(config, 'r') as f:
- if config.endswith('.yaml') or config.endswith('.yml'):
+ with open(config, "r") as f:
+ if config.endswith(".yaml") or config.endswith(".yml"):
_config = yaml.safe_load(f)
else:
_config = json.load(f)
diff --git a/pysentinel/config/validator.py b/pysentinel/config/validator.py
index 4c4af40..b437daf 100644
--- a/pysentinel/config/validator.py
+++ b/pysentinel/config/validator.py
@@ -11,12 +11,14 @@ def sanitize_config(self, config: Dict) -> Dict:
sanitized = config.copy()
# Remove sensitive keys (passwords, tokens, etc.)
- sensitive_keys = ['password', 'token', 'secret', 'key', 'api_key']
+ sensitive_keys = ["password", "token", "secret", "key", "api_key"]
def remove_sensitive(obj):
if isinstance(obj, dict):
- return {k: remove_sensitive(v) if k.lower() not in sensitive_keys else "***"
- for k, v in obj.items()}
+ return {
+ k: remove_sensitive(v) if k.lower() not in sensitive_keys else "***"
+ for k, v in obj.items()
+ }
elif isinstance(obj, list):
return [remove_sensitive(item) for item in obj]
return obj
diff --git a/pysentinel/core/scanner.py b/pysentinel/core/scanner.py
index d5d3b10..306af5f 100644
--- a/pysentinel/core/scanner.py
+++ b/pysentinel/core/scanner.py
@@ -71,30 +71,30 @@ def _setup_from_config(self):
config = self._config
# Setup global configuration
- self._global_config = config.get('global', {})
+ self._global_config = config.get("global", {})
# Setup data sources
- self._setup_datasources(config.get('datasources', {}))
+ self._setup_datasources(config.get("datasources", {}))
# Setup alert channels
- self._setup_channels(config.get('alert_channels', {}))
+ self._setup_channels(config.get("alert_channels", {}))
# Setup alert groups and their alerts
- self._setup_alert_groups(config.get('alert_groups', {}))
+ self._setup_alert_groups(config.get("alert_groups", {}))
def _setup_datasources(self, datasources_config: Dict):
"""Setup data sources from configuration"""
datasource_factories = {
- 'postgresql': PostgreSQLDataSource,
- 'http': HTTPDataSource,
- 'redis': RedisDataSource,
- 'prometheus': PrometheusDataSource,
- 'elasticsearch': ElasticsearchDataSource
+ "postgresql": PostgreSQLDataSource,
+ "http": HTTPDataSource,
+ "redis": RedisDataSource,
+ "prometheus": PrometheusDataSource,
+ "elasticsearch": ElasticsearchDataSource,
}
for name, config in datasources_config.items():
- ds_type = config.get('type')
- ds_enabled = config.get('enabled', False)
+ ds_type = config.get("type")
+ ds_enabled = config.get("enabled", False)
if ds_type in datasource_factories and ds_enabled:
try:
@@ -109,14 +109,14 @@ def _setup_datasources(self, datasources_config: Dict):
def _setup_channels(self, channel_config: Dict):
"""Setup alert channels from configuration"""
channel_factory = {
- 'email': Email,
- 'slack': Slack,
- 'webhook': Webhook,
- 'telegram': Telegram
+ "email": Email,
+ "slack": Slack,
+ "webhook": Webhook,
+ "telegram": Telegram,
}
for name, config in channel_config.items():
- channel_type = config.get('type')
+ channel_type = config.get("type")
if channel_type in channel_factory:
try:
channel = channel_factory[channel_type](name, config)
@@ -130,34 +130,38 @@ def _setup_channels(self, channel_config: Dict):
def _setup_alert_groups(self, alert_groups_config: Dict):
"""Setup alert groups and their alerts from configuration"""
for group_name, group_config in alert_groups_config.items():
- if not group_config.get('enabled', True):
+ if not group_config.get("enabled", True):
continue
self.alert_groups[group_name] = group_config
# Setup alerts within this group
- for alert_config in group_config.get('alerts', []):
+ for alert_config in group_config.get("alerts", []):
try:
alert_def = AlertDefinition(
- name=alert_config['name'],
- metrics=alert_config['metrics'],
- query=alert_config['query'],
- datasource=alert_config['datasource'],
- threshold=alert_config['threshold'],
- severity=Severity(alert_config['severity']),
- interval=alert_config['interval'],
- alert_channels=alert_config['alert_channels'],
- description=alert_config['description'],
- alert_group=group_name
+ name=alert_config["name"],
+ metrics=alert_config["metrics"],
+ query=alert_config["query"],
+ datasource=alert_config["datasource"],
+ threshold=alert_config["threshold"],
+ severity=Severity(alert_config["severity"]),
+ interval=alert_config["interval"],
+ alert_channels=alert_config["alert_channels"],
+ description=alert_config["description"],
+ alert_group=group_name,
)
self.alert_definitions.append(alert_def)
- logger.info(f"Added alert '{alert_def.name}' to group '{group_name}'")
+ logger.info(
+ f"Added alert '{alert_def.name}' to group '{group_name}'"
+ )
except Exception as e:
- logger.error(f"Failed to create alert {alert_config.get('name')}: {e}")
+ logger.error(
+ f"Failed to create alert {alert_config.get('name')}: {e}"
+ )
def _should_send_alert(self, violation: Violation) -> bool:
"""Check if alert should be sent based on cooldown"""
- cooldown_minutes = self._global_config.get('alert_cooldown_minutes', 5)
+ cooldown_minutes = self._global_config.get("alert_cooldown_minutes", 5)
cooldown_key = f"{violation.datasource_name}_{violation.alert_name}"
if cooldown_key in self._alert_cooldowns:
@@ -278,10 +282,14 @@ async def scan_once_async(self):
for datasource_name, alerts in alerts_by_datasource.items():
# Check if the datasource is enabled and exists
if datasource_name not in self.datasources:
- logger.warning(f"Datasource '{datasource_name}' not found, skipping alerts")
+ logger.warning(
+ f"Datasource '{datasource_name}' not found, skipping alerts"
+ )
continue
if not self.datasources[datasource_name].enabled:
- logger.warning(f"Datasource '{datasource_name}' is disabled, skipping alerts")
+ logger.warning(
+ f"Datasource '{datasource_name}' is disabled, skipping alerts"
+ )
continue
if datasource_name in self.datasources:
@@ -297,18 +305,25 @@ async def scan_once_async(self):
scan_duration = time.time() - scan_start
logger.debug(f"Scan completed in {scan_duration:.2f}s")
- def _should_check_alert(self, alert_def: AlertDefinition, current_time: datetime) -> bool:
+ def _should_check_alert(
+ self, alert_def: AlertDefinition, current_time: datetime
+ ) -> bool:
"""Check if an alert should be evaluated based on its interval"""
if not alert_def.enabled:
return False
if alert_def.interval <= 0:
return True
last_run = self._alert_db.get_last_run(alert_def.name)
- if not last_run or (current_time - last_run).total_seconds() >= alert_def.interval:
+ if (
+ not last_run
+ or (current_time - last_run).total_seconds() >= alert_def.interval
+ ):
return True
return False
- async def _check_alerts_for_datasource(self, datasource_name: str, alerts: List[AlertDefinition]):
+ async def _check_alerts_for_datasource(
+ self, datasource_name: str, alerts: List[AlertDefinition]
+ ):
"""Check all alerts for a specific datasource"""
datasource = self.datasources[datasource_name]
@@ -326,7 +341,9 @@ async def _check_alerts_for_datasource(self, datasource_name: str, alerts: List[
# Check threshold
if alert_def.check_threshold(metric_value):
- violation = alert_def.create_violation(metric_value, datasource_name)
+ violation = alert_def.create_violation(
+ metric_value, datasource_name
+ )
await self._handle_violation(violation)
else:
# Clear any existing violation for this alert
@@ -338,16 +355,20 @@ async def _check_alerts_for_datasource(self, datasource_name: str, alerts: List[
metric_data = MetricData(
datasource_name=datasource_name,
metrics=result,
- timestamp=datetime.now()
+ timestamp=datetime.now(),
)
self._latest_metrics[datasource_name] = metric_data
except Exception as e:
- logger.error(f"Error checking alert '{alert_def.name}' on datasource '{datasource_name}': {e}")
+ logger.error(
+ f"Error checking alert '{alert_def.name}' on datasource '{datasource_name}': {e}"
+ )
datasource.error_count += 1
if datasource.error_count >= datasource.max_errors:
- logger.error(f"Disabling datasource {datasource_name} due to too many errors")
+ logger.error(
+ f"Disabling datasource {datasource_name} due to too many errors"
+ )
datasource.enabled = False
async def _handle_violation(self, violation: Violation):
@@ -375,7 +396,9 @@ async def _handle_violation(self, violation: Violation):
logger.error(f"Error in violation callback: {e}")
# Send alerts to configured channels
- alert_def = next((a for a in self.alert_definitions if a.name == violation.alert_name), None)
+ alert_def = next(
+ (a for a in self.alert_definitions if a.name == violation.alert_name), None
+ )
if alert_def:
for channel_name in alert_def.alert_channels:
if channel_name in self.alert_channels:
@@ -422,7 +445,9 @@ async def get_active_alerts_async(self) -> List[Dict]:
async def get_alert_history_async(self, limit: int = 100) -> List[Dict]:
"""Get alert history"""
- recent_violations = self._violation_history[-limit:] if limit else self._violation_history
+ recent_violations = (
+ self._violation_history[-limit:] if limit else self._violation_history
+ )
return [violation.to_dict() for violation in recent_violations]
async def acknowledge_alert_async(self, alert_id: str) -> bool:
@@ -451,12 +476,12 @@ async def update_thresholds_async(self, thresholds_config: List[Dict]):
new_thresholds = []
for config in thresholds_config:
threshold = Threshold(
- metric_name=config['metric'],
- operator=config['operator'],
- value=config['value'],
- severity=Severity(config.get('severity', Severity.WARNING)),
- message=config.get('message'),
- datasource_filter=config.get('datasource_filter')
+ metric_name=config["metric"],
+ operator=config["operator"],
+ value=config["value"],
+ severity=Severity(config.get("severity", Severity.WARNING)),
+ message=config.get("message"),
+ datasource_filter=config.get("datasource_filter"),
)
new_thresholds.append(threshold)
@@ -474,7 +499,9 @@ async def add_datasource_async(self, datasource_config: Dict):
async def remove_datasource_async(self, datasource_name: str):
"""Remove data source dynamically"""
- datasource = next((ds for ds in self.datasources if ds.name == datasource_name), None)
+ datasource = next(
+ (ds for ds in self.datasources if ds.name == datasource_name), None
+ )
if datasource:
await datasource.close()
self.remove_datasource(datasource_name)
@@ -507,8 +534,9 @@ async def stream_metrics_async(self):
# Check for new or updated metrics
for source_name, metrics in current_metrics.items():
- if (source_name not in last_metrics or
- metrics['timestamp'] != last_metrics.get(source_name, {}).get('timestamp')):
+ if source_name not in last_metrics or metrics[
+ "timestamp"
+ ] != last_metrics.get(source_name, {}).get("timestamp"):
yield {source_name: metrics}
last_metrics = current_metrics
diff --git a/pysentinel/core/threshold.py b/pysentinel/core/threshold.py
index 8453c6f..d5ad567 100644
--- a/pysentinel/core/threshold.py
+++ b/pysentinel/core/threshold.py
@@ -8,6 +8,7 @@
@dataclass
class Violation:
"""Represents a threshold violation"""
+
alert_name: str
metric_name: str
current_value: Any
@@ -27,14 +28,15 @@ def __post_init__(self):
def to_dict(self) -> Dict:
data = asdict(self)
- data['severity'] = self.severity.value
- data['timestamp'] = self.timestamp.isoformat()
+ data["severity"] = self.severity.value
+ data["timestamp"] = self.timestamp.isoformat()
return data
@dataclass
class MetricData:
"""Represents collected metric data"""
+
datasource_name: str
metrics: Dict[str, Any]
timestamp: datetime
@@ -42,16 +44,17 @@ class MetricData:
def to_dict(self) -> Dict:
return {
- 'datasource_name': self.datasource_name,
- 'metrics': self.metrics,
- 'timestamp': self.timestamp.isoformat(),
- 'collection_time_ms': self.collection_time_ms
+ "datasource_name": self.datasource_name,
+ "metrics": self.metrics,
+ "timestamp": self.timestamp.isoformat(),
+ "collection_time_ms": self.collection_time_ms,
}
@dataclass
class AlertDefinition:
"""Represents an alert definition from config"""
+
name: str
metrics: str
query: str
@@ -66,8 +69,8 @@ class AlertDefinition:
def create_violation(self, current_value: Any, datasource_name: str) -> Violation:
"""Create a violation from this alert definition"""
- threshold_value = self.threshold.get('max') or self.threshold.get('min')
- operator = '<=' if self.threshold.get('max') else '>='
+ threshold_value = self.threshold.get("max") or self.threshold.get("min")
+ operator = "<=" if self.threshold.get("max") else ">="
return Violation(
alert_name=self.name,
@@ -79,16 +82,16 @@ def create_violation(self, current_value: Any, datasource_name: str) -> Violatio
message=self.description,
timestamp=datetime.now(),
datasource_name=datasource_name,
- alert_group=self.alert_group
+ alert_group=self.alert_group,
)
def check_threshold(self, value: Any) -> bool:
"""Check if a value violates this alert's threshold"""
try:
- if self.threshold.get('max') is not None:
- return float(value) > float(self.threshold['max'])
- elif self.threshold.get('min') is not None:
- return float(value) < float(self.threshold['min'])
+ if self.threshold.get("max") is not None:
+ return float(value) > float(self.threshold["max"])
+ elif self.threshold.get("min") is not None:
+ return float(value) < float(self.threshold["min"])
return False
except (ValueError, TypeError):
return False
@@ -98,8 +101,16 @@ class Threshold:
"""
Represents a threshold for alerting.
"""
- def __init__(self, metric_name: str, operator: str, value: float, severity: Severity = Severity.WARNING,
- message: str = "", datasource_filter: Optional[str] = None):
+
+ def __init__(
+ self,
+ metric_name: str,
+ operator: str,
+ value: float,
+ severity: Severity = Severity.WARNING,
+ message: str = "",
+ datasource_filter: Optional[str] = None,
+ ):
self.metric_name = metric_name
self.operator = operator
self.value = value
diff --git a/pysentinel/datasources/api.py b/pysentinel/datasources/api.py
index 5c90729..cfb1e10 100644
--- a/pysentinel/datasources/api.py
+++ b/pysentinel/datasources/api.py
@@ -17,23 +17,32 @@ async def close(self):
async def fetch_data(self, query: str) -> Dict[str, Any]:
import aiohttp
+
url = f"{self.config['base_url']}{query}"
- headers = self.config.get('headers', {})
+ headers = self.config.get("headers", {})
# Replace environment variables in headers
for key, value in headers.items():
- if isinstance(value, str) and value.startswith('${') and value.endswith('}'):
+ if (
+ isinstance(value, str)
+ and value.startswith("${")
+ and value.endswith("}")
+ ):
env_var = value[2:-1]
headers[key] = os.getenv(env_var, value)
try:
- async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.connection_timeout)) as session:
+ async with aiohttp.ClientSession(
+ timeout=aiohttp.ClientTimeout(total=self.connection_timeout)
+ ) as session:
async with session.get(url, headers=headers) as response:
if response.status == 200:
data = await response.json()
return data
else:
- raise DataSourceException(f"HTTP {response.status}: {await response.text()}")
+ raise DataSourceException(
+ f"HTTP {response.status}: {await response.text()}"
+ )
except Exception as e:
logger.error(f"Error fetching from HTTP API: {e}")
- raise DataSourceException(f"HTTP fetch failed: {e}")
\ No newline at end of file
+ raise DataSourceException(f"HTTP fetch failed: {e}")
diff --git a/pysentinel/datasources/base.py b/pysentinel/datasources/base.py
index 8e9d2d5..205f503 100644
--- a/pysentinel/datasources/base.py
+++ b/pysentinel/datasources/base.py
@@ -10,14 +10,14 @@ class DataSource(ABC):
"""Abstract base class for data sources"""
def __init__(self, name: str, config: Dict, **kwargs):
- self.interval = config.get('interval', 60) # Default to 60 seconds
+ self.interval = config.get("interval", 60) # Default to 60 seconds
self.name = name
self.config = config
- self.enabled = config.get('enabled', False)
+ self.enabled = config.get("enabled", False)
self.last_fetch_time = None
self.error_count = 0
- self.max_errors = config.get('max_retries', 5)
- self.connection_timeout = config.get('timeout', 30)
+ self.max_errors = config.get("max_retries", 5)
+ self.connection_timeout = config.get("timeout", 30)
self._connection = None
@abstractmethod
diff --git a/pysentinel/datasources/database.py b/pysentinel/datasources/database.py
index a7b4d5d..335688e 100644
--- a/pysentinel/datasources/database.py
+++ b/pysentinel/datasources/database.py
@@ -10,7 +10,8 @@ class PostgreSQLDataSource(DataSource):
async def connect(self):
if not self._connection:
import asyncpg
- self._connection = await asyncpg.connect(self.config['connection_string'])
+
+ self._connection = await asyncpg.connect(self.config["connection_string"])
async def close(self):
if self._connection:
@@ -24,4 +25,4 @@ async def fetch_data(self, query: str) -> Dict[str, Any]:
return dict(result) if result else {}
except Exception as e:
logger.error(f"Error executing PostgreSQL query: {e}")
- raise DataSourceException(f"PostgreSQL query failed: {e}")
\ No newline at end of file
+ raise DataSourceException(f"PostgreSQL query failed: {e}")
diff --git a/pysentinel/datasources/elasticsearch.py b/pysentinel/datasources/elasticsearch.py
index 869bbdb..c0cb096 100644
--- a/pysentinel/datasources/elasticsearch.py
+++ b/pysentinel/datasources/elasticsearch.py
@@ -12,7 +12,7 @@ class ElasticsearchDataSource(DataSource):
async def connect(self):
if not self._connection:
self._connection = AsyncElasticsearch(
- self.config['hosts'],
+ self.config["hosts"],
)
async def close(self):
@@ -25,20 +25,19 @@ async def fetch_data(self, query: str) -> Dict[str, Any]:
try:
query_dict = json.loads(query)
result = await self._connection.search(
- index=self.config['index_pattern'],
- body=query_dict
+ index=self.config["index_pattern"], body=query_dict
)
# Extract aggregation results
metrics = {}
- if 'aggregations' in result:
- for agg_name, agg_result in result['aggregations'].items():
- if 'value' in agg_result:
- metrics[agg_name] = agg_result['value']
- elif 'doc_count' in agg_result:
- metrics[agg_name] = agg_result['doc_count']
+ if "aggregations" in result:
+ for agg_name, agg_result in result["aggregations"].items():
+ if "value" in agg_result:
+ metrics[agg_name] = agg_result["value"]
+ elif "doc_count" in agg_result:
+ metrics[agg_name] = agg_result["doc_count"]
return metrics
except Exception as e:
logger.error(f"Error executing Elasticsearch query: {e}")
- raise DataSourceException(f"Elasticsearch query failed: {e}")
\ No newline at end of file
+ raise DataSourceException(f"Elasticsearch query failed: {e}")
diff --git a/pysentinel/datasources/prometheus.py b/pysentinel/datasources/prometheus.py
index 9de602e..04b7442 100644
--- a/pysentinel/datasources/prometheus.py
+++ b/pysentinel/datasources/prometheus.py
@@ -15,23 +15,28 @@ async def close(self):
async def fetch_data(self, query: str) -> Dict[str, Any]:
import aiohttp
+
url = f"{self.config['url']}/api/v1/query"
- params = {'query': query}
+ params = {"query": query}
try:
- async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.connection_timeout)) as session:
+ async with aiohttp.ClientSession(
+ timeout=aiohttp.ClientTimeout(total=self.connection_timeout)
+ ) as session:
async with session.get(url, params=params) as response:
if response.status == 200:
data = await response.json()
- if data['status'] == 'success':
- result = data['data']['result']
+ if data["status"] == "success":
+ result = data["data"]["result"]
if result:
- metric_name = query.split('(')[0].replace('avg', '').strip()
- value = float(result[0]['value'][1])
+ metric_name = (
+ query.split("(")[0].replace("avg", "").strip()
+ )
+ value = float(result[0]["value"][1])
return {metric_name: value}
return {}
else:
raise DataSourceException(f"Prometheus HTTP {response.status}")
except Exception as e:
logger.error(f"Error fetching from Prometheus: {e}")
- raise DataSourceException(f"Prometheus query failed: {e}")
\ No newline at end of file
+ raise DataSourceException(f"Prometheus query failed: {e}")
diff --git a/pysentinel/datasources/redis.py b/pysentinel/datasources/redis.py
index 230eb48..d79d92a 100644
--- a/pysentinel/datasources/redis.py
+++ b/pysentinel/datasources/redis.py
@@ -11,8 +11,9 @@ class RedisDataSource(DataSource):
async def connect(self):
if not self._connection:
import aioredis
- password = self.config.get('password', '')
- if password.startswith('${') and password.endswith('}'):
+
+ password = self.config.get("password", "")
+ if password.startswith("${") and password.endswith("}"):
env_var = password[2:-1]
password = os.getenv(env_var, password)
@@ -30,17 +31,21 @@ async def fetch_data(self, query: str) -> Dict[str, Any]:
try:
if query == "INFO stats":
info = await self._connection.info("stats")
- hit_rate = (info.get('keyspace_hits', 0) / max(
- info.get('keyspace_hits', 0) + info.get('keyspace_misses', 0), 1)) * 100
+ hit_rate = (
+ info.get("keyspace_hits", 0)
+ / max(
+ info.get("keyspace_hits", 0) + info.get("keyspace_misses", 0), 1
+ )
+ ) * 100
return {"hit_rate": hit_rate}
elif query == "INFO memory":
info = await self._connection.info("memory")
- return {"memory_usage": info.get('used_memory_rss', 0)}
+ return {"memory_usage": info.get("used_memory_rss", 0)}
elif query == "INFO clients":
info = await self._connection.info("clients")
- return {"connected_clients": info.get('connected_clients', 0)}
+ return {"connected_clients": info.get("connected_clients", 0)}
else:
return {}
except Exception as e:
logger.error(f"Error executing Redis query: {e}")
- raise DataSourceException(f"Redis query failed: {e}")
\ No newline at end of file
+ raise DataSourceException(f"Redis query failed: {e}")
diff --git a/pysentinel/utils/alert_db.py b/pysentinel/utils/alert_db.py
index 17fac5c..1c252d6 100644
--- a/pysentinel/utils/alert_db.py
+++ b/pysentinel/utils/alert_db.py
@@ -3,29 +3,36 @@
class AlertDB:
- def __init__(self, db_path='alerts.db'):
+ def __init__(self, db_path="alerts.db"):
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self._create_table()
def _create_table(self):
with self.conn:
- self.conn.execute('''
+ self.conn.execute(
+ """
CREATE TABLE IF NOT EXISTS alert_runtime (
alert_name TEXT PRIMARY KEY,
last_run TIMESTAMP
)
- ''')
+ """
+ )
def get_last_run(self, alert_name):
cur = self.conn.cursor()
- cur.execute('SELECT last_run FROM alert_runtime WHERE alert_name=?', (alert_name,))
+ cur.execute(
+ "SELECT last_run FROM alert_runtime WHERE alert_name=?", (alert_name,)
+ )
row = cur.fetchone()
return datetime.fromisoformat(row[0]) if row else None
def update_last_run(self, alert_name, run_time):
with self.conn:
- self.conn.execute('''
+ self.conn.execute(
+ """
INSERT INTO alert_runtime (alert_name, last_run)
VALUES (?, ?)
ON CONFLICT(alert_name) DO UPDATE SET last_run=excluded.last_run
- ''', (alert_name, run_time.isoformat()))
+ """,
+ (alert_name, run_time.isoformat()),
+ )
diff --git a/pysentinel/utils/exception.py b/pysentinel/utils/exception.py
index 8f2dfa4..5bd5c57 100644
--- a/pysentinel/utils/exception.py
+++ b/pysentinel/utils/exception.py
@@ -1,13 +1,16 @@
class ScannerException(Exception):
"""Base exception for scanner errors"""
+
pass
class DataSourceException(ScannerException):
"""Exception for data source related errors"""
+
pass
class ThresholdException(ScannerException):
"""Exception for threshold related errors"""
+
pass
diff --git a/tests/conftest.py b/tests/conftest.py
new file mode 100644
index 0000000..b9705d7
--- /dev/null
+++ b/tests/conftest.py
@@ -0,0 +1,30 @@
+import pytest
+import asyncio
+from pathlib import Path
+
+
+@pytest.fixture(scope="session")
+def event_loop():
+ """Create an instance of the default event loop for the test session."""
+ loop = asyncio.get_event_loop_policy().new_event_loop()
+ yield loop
+ loop.close()
+
+
+@pytest.fixture
+def sample_config():
+ """Sample configuration for tests"""
+ return {
+ "scanner": {"interval": 30, "timeout": 10},
+ "alerts": {"email": {"enabled": True, "smtp_server": "smtp.example.com"}},
+ }
+
+
+@pytest.fixture
+def temp_config_file(tmp_path, sample_config):
+ """Create temporary config file for tests"""
+ config_file = tmp_path / "test_config.yml"
+ import yaml
+
+ config_file.write_text(yaml.dump(sample_config))
+ return config_file
diff --git a/tests/pysentinel/cli/test_cli.py b/tests/pysentinel/cli/test_cli.py
new file mode 100644
index 0000000..e12c2ad
--- /dev/null
+++ b/tests/pysentinel/cli/test_cli.py
@@ -0,0 +1,250 @@
+import pytest
+import asyncio
+from unittest.mock import Mock, patch, AsyncMock
+from pathlib import Path
+import argparse
+import sys
+from io import StringIO
+
+from pysentinel.cli.cli import (
+ main,
+ start_scanner_sync,
+ start_scanner_async,
+ validate_config_file,
+)
+
+
+class TestValidateConfigFile:
+ """Test config file validation"""
+
+ def test_valid_config_file(self, tmp_path):
+ """Test validation with valid config file"""
+ config_file = tmp_path / "config.yml"
+ config_file.write_text("test: config")
+
+ result = validate_config_file(str(config_file))
+ assert result == str(config_file)
+
+ def test_nonexistent_config_file(self):
+ """Test validation with non-existent config file"""
+ with pytest.raises(argparse.ArgumentTypeError, match="does not exist"):
+ validate_config_file("nonexistent.yml")
+
+ def test_directory_instead_of_file(self, tmp_path):
+ """Test validation when path is directory not file"""
+ with pytest.raises(argparse.ArgumentTypeError, match="is not a file"):
+ validate_config_file(str(tmp_path))
+
+
+class TestStartScannerSync:
+ """Test synchronous scanner startup"""
+
+ @patch("pysentinel.cli.cli.load_config")
+ @patch("pysentinel.cli.cli.Scanner")
+ def test_start_scanner_sync_success(self, mock_scanner_class, mock_load_config):
+ """Test successful synchronous scanner start"""
+ mock_config = {"test": "config"}
+ mock_load_config.return_value = mock_config
+ mock_scanner = Mock()
+ mock_scanner_class.return_value = mock_scanner
+
+ start_scanner_sync("config.yml")
+
+ mock_load_config.assert_called_once_with("config.yml")
+ mock_scanner_class.assert_called_once_with(mock_config)
+ mock_scanner.start.assert_called_once()
+
+ @patch("pysentinel.cli.cli.load_config")
+ @patch("builtins.print")
+ def test_start_scanner_sync_load_config_error(self, mock_print, mock_load_config):
+ """Test error during config loading"""
+ mock_load_config.side_effect = Exception("Config error")
+
+ with pytest.raises(SystemExit) as exc_info:
+ start_scanner_sync("config.yml")
+
+ assert exc_info.value.code == 1
+ mock_print.assert_called_with("Error starting scanner: Config error")
+
+ @patch("pysentinel.cli.cli.load_config")
+ @patch("pysentinel.cli.cli.Scanner")
+ @patch("builtins.print")
+ def test_start_scanner_sync_keyboard_interrupt(
+ self, mock_print, mock_scanner_class, mock_load_config
+ ):
+ """Test keyboard interrupt handling"""
+ mock_load_config.return_value = {"test": "config"}
+ mock_scanner = Mock()
+ mock_scanner.start.side_effect = KeyboardInterrupt()
+ mock_scanner_class.return_value = mock_scanner
+
+ with pytest.raises(SystemExit) as exc_info:
+ start_scanner_sync("config.yml")
+
+ assert exc_info.value.code == 0
+ mock_print.assert_called_with("\nScanner stopped by user")
+
+
+class TestStartScannerAsync:
+ """Test asynchronous scanner startup"""
+
+ @patch("pysentinel.cli.cli.load_config")
+ @patch("pysentinel.cli.cli.Scanner")
+ @pytest.mark.asyncio
+ async def test_start_scanner_async_success(
+ self, mock_scanner_class, mock_load_config
+ ):
+ """Test successful asynchronous scanner start"""
+ mock_config = {"test": "config"}
+ mock_load_config.return_value = mock_config
+ mock_scanner = Mock()
+ mock_scanner.start_async = AsyncMock()
+ mock_scanner_class.return_value = mock_scanner
+
+ await start_scanner_async("config.yml")
+
+ mock_load_config.assert_called_once_with("config.yml")
+ mock_scanner_class.assert_called_once_with(mock_config)
+ mock_scanner.start_async.assert_called_once()
+
+ @patch("pysentinel.cli.cli.load_config")
+ @patch("builtins.print")
+ @pytest.mark.asyncio
+ async def test_start_scanner_async_load_config_error(
+ self, mock_print, mock_load_config
+ ):
+ """Test error during async config loading"""
+ mock_load_config.side_effect = Exception("Config error")
+
+ with pytest.raises(SystemExit) as exc_info:
+ await start_scanner_async("config.yml")
+
+ assert exc_info.value.code == 1
+ mock_print.assert_called_with("Error starting scanner: Config error")
+
+ @patch("pysentinel.cli.cli.load_config")
+ @patch("pysentinel.cli.cli.Scanner")
+ @patch("builtins.print")
+ @pytest.mark.asyncio
+ async def test_start_scanner_async_keyboard_interrupt(
+ self, mock_print, mock_scanner_class, mock_load_config
+ ):
+ """Test keyboard interrupt handling in async mode"""
+ mock_load_config.return_value = {"test": "config"}
+ mock_scanner = Mock()
+ mock_scanner.start_async = AsyncMock(side_effect=KeyboardInterrupt())
+ mock_scanner_class.return_value = mock_scanner
+
+ with pytest.raises(SystemExit) as exc_info:
+ await start_scanner_async("config.yml")
+
+ assert exc_info.value.code == 0
+ mock_print.assert_called_with("\nScanner stopped by user")
+
+
+class TestMainCLI:
+ """Test main CLI function"""
+
+ @patch("pysentinel.cli.cli.start_scanner_sync")
+ @patch("sys.argv", ["pysentinel", "config.yml"])
+ def test_main_sync_mode(self, mock_start_sync, tmp_path):
+ """Test main function in sync mode"""
+ config_file = tmp_path / "config.yml"
+ config_file.write_text("test: config")
+
+ with patch("sys.argv", ["pysentinel", str(config_file)]):
+ main()
+
+ mock_start_sync.assert_called_once_with(str(config_file))
+
+ @patch("pysentinel.cli.cli.asyncio.run")
+ @patch("pysentinel.cli.cli.start_scanner_async")
+ def test_main_async_mode(self, mock_start_async, mock_asyncio_run, tmp_path):
+ """Test main function in async mode"""
+ config_file = tmp_path / "config.yml"
+ config_file.write_text("test: config")
+
+ with patch("sys.argv", ["pysentinel", str(config_file), "--async"]):
+ main()
+
+ mock_asyncio_run.assert_called_once()
+
+ def test_main_help_output(self, capsys):
+ """Test help output"""
+ with patch("sys.argv", ["pysentinel", "--help"]):
+ with pytest.raises(SystemExit) as exc_info:
+ main()
+
+ assert exc_info.value.code == 0
+ captured = capsys.readouterr()
+ assert "PySentinel - Threshold-based alerting scanner" in captured.out
+ assert "Run synchronously" in captured.out
+ assert "Run asynchronously" in captured.out
+
+ def test_main_version_output(self, capsys):
+ """Test version output"""
+ with patch("sys.argv", ["pysentinel", "--version"]):
+ with pytest.raises(SystemExit) as exc_info:
+ main()
+
+ assert exc_info.value.code == 0
+ captured = capsys.readouterr()
+ assert "PySentinel CLI 0.1.0" in captured.out
+
+ def test_main_invalid_config_file(self, capsys):
+ """Test main with invalid config file"""
+ with patch("sys.argv", ["pysentinel", "nonexistent.yml"]):
+ with pytest.raises(SystemExit) as exc_info:
+ main()
+
+ assert exc_info.value.code == 2 # argparse error code
+ captured = capsys.readouterr()
+ assert "does not exist" in captured.err
+
+ def test_main_no_arguments(self, capsys):
+ """Test main with no arguments"""
+ with patch("sys.argv", ["pysentinel"]):
+ with pytest.raises(SystemExit) as exc_info:
+ main()
+
+ assert exc_info.value.code == 2 # argparse error code
+ captured = capsys.readouterr()
+ assert "required" in captured.err.lower()
+
+
+class TestCLIIntegration:
+ """Integration tests for CLI"""
+
+ @patch("pysentinel.cli.cli.load_config")
+ @patch("pysentinel.cli.cli.Scanner")
+ def test_full_sync_workflow(self, mock_scanner_class, mock_load_config, tmp_path):
+ """Test complete synchronous workflow"""
+ config_file = tmp_path / "config.yml"
+ config_file.write_text("scanner:\n interval: 10")
+
+ mock_config = {"scanner": {"interval": 10}}
+ mock_load_config.return_value = mock_config
+ mock_scanner = Mock()
+ mock_scanner_class.return_value = mock_scanner
+
+ with patch("sys.argv", ["pysentinel", str(config_file)]):
+ main()
+
+ mock_load_config.assert_called_once_with(str(config_file))
+ mock_scanner_class.assert_called_once_with(mock_config)
+ mock_scanner.start.assert_called_once()
+
+ @patch("pysentinel.cli.cli.load_config")
+ @patch("pysentinel.cli.cli.Scanner")
+ @patch("pysentinel.cli.cli.asyncio.run")
+ def test_full_async_workflow(
+ self, mock_asyncio_run, mock_scanner_class, mock_load_config, tmp_path
+ ):
+ """Test complete asynchronous workflow"""
+ config_file = tmp_path / "config.yml"
+ config_file.write_text("scanner:\n interval: 10")
+
+ with patch("sys.argv", ["pysentinel", str(config_file), "--async"]):
+ main()
+
+ mock_asyncio_run.assert_called_once()
diff --git a/tests/pysentinel/core/test_scanner.py b/tests/pysentinel/core/test_scanner.py
index 4f8ca62..e8289a5 100644
--- a/tests/pysentinel/core/test_scanner.py
+++ b/tests/pysentinel/core/test_scanner.py
@@ -15,11 +15,14 @@ def minimal_config():
return {
"global": {"log_level": "INFO"},
"datasources": {
- "testdb": {"type": "postgresql", "enabled": True, "interval": 60, "config": {"host": "localhost", "port": 5432}},
- },
- "alert_channels": {
- "email1": {"type": "email"}
+ "testdb": {
+ "type": "postgresql",
+ "enabled": True,
+ "interval": 60,
+ "config": {"host": "localhost", "port": 5432},
+ },
},
+ "alert_channels": {"email1": {"type": "email"}},
"alert_groups": {
"group1": {
"enabled": True,
@@ -33,11 +36,11 @@ def minimal_config():
"severity": "critical",
"interval": 60,
"alert_channels": ["email1"],
- "description": "CPU usage high"
+ "description": "CPU usage high",
}
- ]
+ ],
}
- }
+ },
}
@@ -89,7 +92,7 @@ async def test_handle_violation_sends_alert():
severity=Severity("critical"),
interval=60,
alert_channels=["chan"],
- description="desc"
+ description="desc",
)
scanner.alert_definitions = [alert_def]
@@ -107,6 +110,7 @@ def test_get_uptime_seconds(monkeypatch):
scanner = Scanner()
assert scanner.get_uptime_seconds() == 0
from datetime import datetime, timedelta
+
scanner.start_time = datetime.now() - timedelta(seconds=10)
assert 9 <= scanner.get_uptime_seconds() <= 11
@@ -116,7 +120,7 @@ def test_get_datasources():
scanner.datasources = [
MagicMock(name="ds1"),
MagicMock(name="ds2", type="postgresql"),
- MagicMock(name="ds3", type="mysql")
+ MagicMock(name="ds3", type="mysql"),
]
datasources = scanner.get_datasources()
assert len(datasources) == 3
@@ -129,7 +133,7 @@ def test_get_metric_count_async():
scanner = Scanner()
scanner._latest_metrics = {
"ds1": MagicMock(metrics={"a": 1, "b": 2}),
- "ds2": MagicMock(metrics={"c": 3})
+ "ds2": MagicMock(metrics={"c": 3}),
}
assert scanner.get_metric_count_async() == 3
@@ -137,6 +141,7 @@ def test_get_metric_count_async():
@pytest.fixture
def config_from_yaml():
import os
+
config_path = os.path.join(os.path.dirname(__file__), "../fixtures/config.yml")
with open(config_path, "r") as f:
return yaml.safe_load(f)
@@ -174,7 +179,9 @@ async def test_scan_once_async_groups_alerts_by_datasource():
scanner._should_check_alert = MagicMock(return_value=True)
scanner._check_alerts_for_datasource = AsyncMock()
await scanner.scan_once_async()
- scanner._check_alerts_for_datasource.assert_any_await("ds1", [alert_def_1, alert_def_3])
+ scanner._check_alerts_for_datasource.assert_any_await(
+ "ds1", [alert_def_1, alert_def_3]
+ )
scanner._check_alerts_for_datasource.assert_any_await("ds2", [alert_def_2])
@@ -233,6 +240,7 @@ async def test_scanner_does_not_send_alert_if_cooldown_active():
mock_channel.send_alert = AsyncMock()
scanner.alert_channels = {"chan": mock_channel}
from pysentinel.core.threshold import AlertDefinition
+
alert_def = AlertDefinition(
name="alert",
metrics="cpu",
@@ -242,7 +250,7 @@ async def test_scanner_does_not_send_alert_if_cooldown_active():
severity=Severity("critical"),
interval=60,
alert_channels=["chan"],
- description="desc"
+ description="desc",
)
scanner.alert_definitions = [alert_def]
await scanner._handle_violation(violation)
diff --git a/tests/pysentinel/core/test_threshold.py b/tests/pysentinel/core/test_threshold.py
index 745c6b7..51d5f39 100644
--- a/tests/pysentinel/core/test_threshold.py
+++ b/tests/pysentinel/core/test_threshold.py
@@ -16,7 +16,7 @@ def test_violation_creation_and_to_dict(self):
severity=Severity.CRITICAL,
message="CPU usage is too high",
timestamp=now,
- datasource_name="server1"
+ datasource_name="server1",
)
d = v.to_dict()
assert d["alert_name"] == "CPU High"
@@ -30,7 +30,7 @@ def test_metric_data_to_dict(self):
datasource_name="db1",
metrics={"connections": 10},
timestamp=now,
- collection_time_ms=123.4
+ collection_time_ms=123.4,
)
d = m.to_dict()
assert d["datasource_name"] == "db1"
@@ -48,7 +48,7 @@ def test_alert_definition_create_violation(self):
severity=Severity.WARNING,
interval=60,
alert_channels=["email"],
- description="Memory is low"
+ description="Memory is low",
)
v = ad.create_violation(current_value=80, datasource_name="server2")
assert isinstance(v, Violation)
@@ -67,7 +67,7 @@ def test_alert_definition_check_threshold_max(self):
severity=Severity.CRITICAL,
interval=60,
alert_channels=["slack"],
- description="Disk usage is high"
+ description="Disk usage is high",
)
assert ad.check_threshold(85) is True
assert ad.check_threshold(75) is False
@@ -82,7 +82,7 @@ def test_alert_definition_check_threshold_min(self):
severity=Severity.WARNING,
interval=60,
alert_channels=["telegram"],
- description="Temperature is low"
+ description="Temperature is low",
)
assert ad.check_threshold(5) is True
assert ad.check_threshold(15) is False
@@ -97,9 +97,7 @@ def test_alert_definition_check_threshold_invalid(self):
severity=Severity.INFO,
interval=60,
alert_channels=[],
- description="No threshold"
+ description="No threshold",
)
assert ad.check_threshold("not_a_number") is False
assert ad.check_threshold(None) is False
-
-