diff --git a/plugins/config.yaml b/plugins/config.yaml index e1a0ecb36..b641a0c21 100644 --- a/plugins/config.yaml +++ b/plugins/config.yaml @@ -448,19 +448,19 @@ plugins: tool_allowlist: ["search", "retrieve"] resource_uri_prefixes: ["http://", "https://"] - # ClamAV Remote Scanner (external MCP) - - name: "ClamAVRemote" - kind: "external" - description: "External ClamAV scanner (file/text) via MCP STDIO" - version: "0.1.0" - author: "Mihai Criveti" - hooks: ["resource_pre_fetch", "resource_post_fetch"] - tags: ["security", "malware", "clamav"] - mode: "enforce" - priority: 62 - mcp: - proto: STDIO - script: plugins/external/clamav_server/run.sh + # ClamAV Remote Scanner (external MCP) - DISABLED due to script path issue + # - name: "ClamAVRemote" + # kind: "external" + # description: "External ClamAV scanner (file/text) via MCP STDIO" + # version: "0.1.0" + # author: "Mihai Criveti" + # hooks: ["resource_pre_fetch", "resource_post_fetch"] + # tags: ["security", "malware", "clamav"] + # mode: "enforce" + # priority: 62 + # mcp: + # proto: STDIO + # script: plugins/external/clamav_server/run.sh # - name: "OPAPluginFilter" # kind: "external" @@ -558,24 +558,24 @@ plugins: direction: "to_user" fields: ["start_time", "end_time"] - # AI Artifacts Normalizer - clean smart quotes, ligatures, bidi controls - - name: "AIArtifactsNormalizer" - kind: "plugins.ai_artifacts_normalizer.ai_artifacts_normalizer.AIArtifactsNormalizerPlugin" - description: "Normalize AI artifacts: smart quotes, ligatures, dashes, ellipses; remove bidi/zero-width; collapse spacing" - version: "0.1.0" - author: "MCP Context Forge Team" - hooks: ["prompt_pre_fetch", "resource_post_fetch", "tool_post_invoke"] - tags: ["normalize", "unicode", "safety"] - mode: "permissive" - priority: 138 - conditions: [] - config: - replace_smart_quotes: true - replace_ligatures: true - remove_bidi_controls: true - collapse_spacing: true - normalize_dashes: true - normalize_ellipsis: true + # AI Artifacts Normalizer - DISABLED due to syntax error in plugin + # - name: "AIArtifactsNormalizer" + # kind: "plugins.ai_artifacts_normalizer.ai_artifacts_normalizer.AIArtifactsNormalizerPlugin" + # description: "Normalize AI artifacts: smart quotes, ligatures, dashes, ellipses; remove bidi/zero-width; collapse spacing" + # version: "0.1.0" + # author: "MCP Context Forge Team" + # hooks: ["prompt_pre_fetch", "resource_post_fetch", "tool_post_invoke"] + # tags: ["normalize", "unicode", "safety"] + # mode: "permissive" + # priority: 138 + # conditions: [] + # config: + # replace_smart_quotes: true + # replace_ligatures: true + # remove_bidi_controls: true + # collapse_spacing: true + # normalize_dashes: true + # normalize_ellipsis: true # SQL Sanitizer - detect dangerous SQL patterns in inputs - name: "SQLSanitizer" @@ -750,3 +750,42 @@ plugins: vault_header_name: "X-Vault-Tokens" vault_handling: "raw" system_handling: "tag" + + # Webhook Notification Plugin - Send HTTP notifications on events + - name: "WebhookNotification" + kind: "plugins.webhook_notification.webhook_notification.WebhookNotificationPlugin" + description: "Send HTTP webhook notifications on events, violations, and state changes" + version: "1.0.0" + author: "Claude Code Assistant" + hooks: ["tool_pre_invoke", "tool_post_invoke", "prompt_post_fetch", "resource_post_fetch"] + tags: ["notification", "webhook", "monitoring", "observability"] + mode: "permissive" + priority: 900 # Run after other plugins to capture their violations + conditions: [] + config: + webhooks: + # Webhook.site for testing - UPDATE WITH YOUR UNIQUE URL + - url: "https://webhook.site/38df7ac1-672a-4e49-9b55-3c0ed469e9d0" + events: ["violation", "tool_success", "tool_error", "rate_limit_exceeded", "pii_detected"] + authentication: + type: "none" + retry_attempts: 2 + retry_delay: 1000 + timeout: 10 + enabled: true + + # Simple default template + default_template: | + { + "event": "{{event}}", + "plugin": "{{plugin_name}}", + "timestamp": "{{timestamp}}", + "request_id": "{{request_id}}", + "user": "{{user}}", + "tenant_id": "{{tenant_id}}", + "violation": {{violation}}, + "metadata": {{metadata}} + } + + include_payload_data: false + max_payload_size: 1000 diff --git a/plugins/webhook_notification/README.md b/plugins/webhook_notification/README.md new file mode 100644 index 000000000..eb99acfbc --- /dev/null +++ b/plugins/webhook_notification/README.md @@ -0,0 +1,222 @@ +# Webhook Notification Plugin + +> Author: Claude Code Assistant +> Version: 1.0.0 + +Sends HTTP webhook notifications on specific events, violations, or state changes. Supports multiple webhooks, event filtering, retry logic with exponential backoff, and various authentication methods. + +## Hooks +- `prompt_pre_fetch` +- `prompt_post_fetch` +- `tool_pre_invoke` +- `tool_post_invoke` +- `resource_pre_fetch` +- `resource_post_fetch` + +## Event Types +- `violation` - General plugin violations +- `rate_limit_exceeded` - Rate limiting violations +- `pii_detected` - PII detection violations +- `harmful_content` - Harmful content violations +- `tool_success` - Successful tool invocations +- `tool_error` - Failed tool invocations +- `prompt_success` - Successful prompt fetches +- `resource_success` - Successful resource fetches +- `plugin_error` - Plugin execution errors + +## Authentication Methods +- `none` - No authentication +- `bearer` - Bearer token authentication +- `api_key` - API key in custom header +- `hmac` - HMAC signature authentication + +## Configuration + +### Basic Configuration +```yaml +config: + webhooks: + - url: "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK" + events: ["violation", "rate_limit_exceeded"] + authentication: + type: "none" + retry_attempts: 3 + retry_delay: 1000 + timeout: 10 + enabled: true +``` + +### Advanced Configuration with Authentication +```yaml +config: + webhooks: + # Slack webhook for violations + - url: "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK" + events: ["violation", "pii_detected", "harmful_content"] + authentication: + type: "bearer" + token: "${env.SLACK_WEBHOOK_TOKEN}" + retry_attempts: 3 + retry_delay: 1000 + + # API endpoint with HMAC authentication + - url: "https://api.example.com/webhooks/mcp-gateway" + events: ["tool_success", "tool_error"] + authentication: + type: "hmac" + hmac_secret: "${env.WEBHOOK_HMAC_SECRET}" + hmac_algorithm: "sha256" + hmac_header: "X-Hub-Signature-256" + retry_attempts: 5 + retry_delay: 2000 + timeout: 30 + + # Monitoring service with API key + - url: "https://monitoring.service.com/events" + events: ["violation", "tool_error", "plugin_error"] + authentication: + type: "api_key" + api_key: "${env.MONITORING_API_KEY}" + api_key_header: "X-API-Key" + retry_attempts: 2 + retry_delay: 500 + + # Custom payload templates per event type + payload_templates: + violation: | + { + "alert": "MCP Gateway Violation", + "severity": "warning", + "message": "{{violation.reason}}: {{violation.description}}", + "plugin": "{{plugin_name}}", + "timestamp": "{{timestamp}}", + "request_id": "{{request_id}}", + "user": "{{user}}", + "details": {{violation}} + } + + rate_limit_exceeded: | + { + "alert": "Rate Limit Exceeded", + "severity": "error", + "message": "Rate limit exceeded for {{user}}", + "timestamp": "{{timestamp}}", + "request_id": "{{request_id}}", + "violation_details": {{violation}} + } + + # Include request payload data in notifications (be careful with sensitive data) + include_payload_data: false + max_payload_size: 1000 +``` + +## Template Variables + +The following variables are available in payload templates: + +- `{{event}}` - Event type (violation, tool_success, etc.) +- `{{plugin_name}}` - Name of the webhook plugin +- `{{timestamp}}` - ISO timestamp of the event +- `{{request_id}}` - Unique request identifier +- `{{user}}` - User identifier (if available) +- `{{tenant_id}}` - Tenant identifier (if available) +- `{{server_id}}` - Server identifier (if available) +- `{{violation}}` - Full violation object (JSON) +- `{{metadata}}` - Event metadata (JSON) +- `{{payload}}` - Request payload data (if enabled, JSON) + +## Features + +### Retry Logic +- Configurable retry attempts (0-10) +- Exponential backoff starting from configured delay +- Individual timeout per webhook request +- Comprehensive error logging + +### Authentication Support +- **Bearer Token**: `Authorization: Bearer ` +- **API Key**: Custom header with API key +- **HMAC**: Request signing with configurable algorithm +- **Environment Variables**: Secure credential management + +### Event Filtering +- Subscribe to specific event types per webhook +- Smart event detection based on violation details +- Separate templates for different event types + +### Concurrent Delivery +- Multiple webhooks sent concurrently +- Non-blocking execution (fire-and-forget) +- Individual retry logic per webhook + +## Security Considerations + +1. **Credential Management**: Use environment variables for sensitive data +2. **Payload Size**: Limit included payload data to prevent sensitive data leaks +3. **HMAC Signatures**: Verify webhook authenticity on receiving end +4. **Network Security**: Use HTTPS endpoints only +5. **Timeout Configuration**: Prevent hanging requests + +## Example Webhook Payloads + +### Violation Event +```json +{ + "event": "violation", + "plugin": "WebhookNotificationPlugin", + "timestamp": "2025-01-15T10:30:45.123Z", + "request_id": "req-12345", + "user": "user@example.com", + "tenant_id": "tenant-abc", + "server_id": "server-xyz", + "violation": { + "reason": "Rate limit exceeded", + "description": "User user@example.com rate limit exceeded", + "code": "RATE_LIMIT", + "details": {"remaining": 0, "reset_in": 45} + }, + "metadata": {} +} +``` + +### Tool Success Event +```json +{ + "event": "tool_success", + "plugin": "WebhookNotificationPlugin", + "timestamp": "2025-01-15T10:30:45.123Z", + "request_id": "req-12346", + "user": "user@example.com", + "violation": null, + "metadata": {"tool_name": "search"} +} +``` + +## Usage Tips + +1. **Start Simple**: Begin with basic Slack webhooks for violations +2. **Test Thoroughly**: Use tools like ngrok for local webhook testing +3. **Monitor Logs**: Check gateway logs for webhook delivery status +4. **Gradual Rollout**: Enable events incrementally to avoid notification spam +5. **Template Testing**: Test payload templates with various event types + +## Performance Notes + +- Webhooks are sent asynchronously and don't block request processing +- Failed webhooks are retried with exponential backoff +- HTTP client connection pooling optimizes performance +- Memory usage scales with number of concurrent webhook deliveries + +## Troubleshooting + +### Common Issues +- **Authentication Failures**: Verify tokens and secrets in environment variables +- **Timeout Errors**: Increase timeout values for slow webhook endpoints +- **Template Errors**: Check Jinja2 syntax in custom templates +- **SSL Errors**: Ensure webhook URLs use valid HTTPS certificates + +### Debug Logging +Enable debug logging to see webhook delivery attempts: +```bash +export LOG_LEVEL=DEBUG +``` \ No newline at end of file diff --git a/plugins/webhook_notification/TESTING.md b/plugins/webhook_notification/TESTING.md new file mode 100644 index 000000000..2aed270fa --- /dev/null +++ b/plugins/webhook_notification/TESTING.md @@ -0,0 +1,430 @@ +# Testing the Webhook Notification Plugin + +This guide covers comprehensive testing strategies for the Webhook Notification Plugin at unit, integration, and end-to-end levels. + +## Test Structure + +``` +tests/unit/mcpgateway/plugins/plugins/webhook_notification/ +├── test_webhook_notification.py # Unit tests +├── test_webhook_integration.py # Integration tests +└── __init__.py # Test package init +``` + +## 1. Unit Tests + +### Running Unit Tests +```bash +# Run all webhook plugin tests +pytest tests/unit/mcpgateway/plugins/plugins/webhook_notification/ -v + +# Run specific test file +pytest tests/unit/mcpgateway/plugins/plugins/webhook_notification/test_webhook_notification.py -v + +# Run with coverage +pytest tests/unit/mcpgateway/plugins/plugins/webhook_notification/ --cov=plugins.webhook_notification --cov-report=html +``` + +### Unit Test Coverage +- ✅ Plugin initialization and configuration +- ✅ Template rendering with context variables +- ✅ HMAC signature creation +- ✅ Webhook delivery success scenarios +- ✅ Authentication methods (Bearer, API Key, HMAC) +- ✅ Retry logic on failures +- ✅ Event type determination +- ✅ Hook execution (tool_post_invoke, prompt_post_fetch, etc.) +- ✅ Event filtering by webhook configuration +- ✅ Disabled webhook handling +- ✅ Custom payload templates +- ✅ Payload size limiting + +### Key Test Cases + +#### Authentication Testing +```python +# Test Bearer token authentication +config = { + "webhooks": [{ + "authentication": { + "type": "bearer", + "token": "test-token-123" + } + }] +} + +# Test API key authentication +config = { + "webhooks": [{ + "authentication": { + "type": "api_key", + "api_key": "test-key", + "api_key_header": "X-API-Key" + } + }] +} +``` + +#### Retry Logic Testing +```python +# Mock HTTP client to return 500 errors +mock_response.status_code = 500 +mock_client.post.return_value = mock_response + +# Verify retries (1 initial + 3 retries = 4 total calls) +assert mock_client.post.call_count == 4 +``` + +## 2. Integration Tests + +### Running Integration Tests +```bash +# Run integration tests +pytest tests/unit/mcpgateway/plugins/plugins/webhook_notification/test_webhook_integration.py -v + +# Run with plugin manager setup +pytest tests/unit/mcpgateway/plugins/plugins/webhook_notification/test_webhook_integration.py::test_webhook_plugin_with_manager -v +``` + +### Integration Test Scenarios +- ✅ Plugin manager initialization with webhook plugin +- ✅ End-to-end hook execution through plugin manager +- ✅ Multiple webhook delivery +- ✅ Custom template usage +- ✅ Plugin interaction with other plugins (PII filter, rate limiter) + +### Sample Integration Test Config +```yaml +plugins: + - name: "WebhookNotification" + kind: "plugins.webhook_notification.webhook_notification.WebhookNotificationPlugin" + hooks: ["tool_post_invoke"] + mode: "permissive" + priority: 900 + config: + webhooks: + - url: "https://test.example.com/webhook" + events: ["tool_success"] + authentication: + type: "bearer" + token: "test-token" +``` + +## 3. Manual Testing + +### Local Webhook Server Setup + +#### Using Python HTTP Server +```bash +# Simple webhook receiver +cat << 'EOF' > webhook_server.py +#!/usr/bin/env python3 +import json +from http.server import HTTPServer, BaseHTTPRequestHandler + +class WebhookHandler(BaseHTTPRequestHandler): + def do_POST(self): + content_length = int(self.headers['Content-Length']) + post_data = self.rfile.read(content_length) + + print(f"\n--- Webhook Received ---") + print(f"Headers: {dict(self.headers)}") + print(f"Body: {post_data.decode('utf-8')}") + + # Parse and pretty print JSON + try: + data = json.loads(post_data.decode('utf-8')) + print(f"Parsed JSON:") + print(json.dumps(data, indent=2)) + except: + pass + + print("--- End Webhook ---\n") + + self.send_response(200) + self.end_headers() + self.wfile.write(b'OK') + +if __name__ == '__main__': + server = HTTPServer(('localhost', 3000), WebhookHandler) + print("Webhook server running on http://localhost:3000") + server.serve_forever() +EOF + +python3 webhook_server.py +``` + +#### Using ngrok for Public URLs +```bash +# Install ngrok (if not installed) +# Mac: brew install ngrok +# Linux: snap install ngrok + +# Expose local webhook server +ngrok http 3000 + +# Use the https URL from ngrok in your webhook config +``` + +### Manual Testing Steps + +1. **Start Local Webhook Server** + ```bash + python3 webhook_server.py + ``` + +2. **Configure Test Environment** + ```bash + cp plugins/webhook_notification/test_config.yaml plugins/config.yaml + # Edit the webhook URL to point to your local server + ``` + +3. **Start MCP Gateway** + ```bash + export PLUGINS_ENABLED=true + export PLUGIN_CONFIG_FILE=plugins/config.yaml + make dev + ``` + +4. **Generate Test Bearer Token** + ```bash + export MCPGATEWAY_BEARER_TOKEN=$(python -m mcpgateway.utils.create_jwt_token --username test@example.com --exp 60 --secret test-secret) + ``` + +5. **Trigger Webhook Events** + + **Tool Success Event:** + ```bash + curl -X POST -H "Authorization: Bearer $MCPGATEWAY_BEARER_TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"jsonrpc":"2.0","id":1,"method":"test_tool","params":{"query":"hello world"}}' \ + http://localhost:8000/ + ``` + + **PII Detection Event:** + ```bash + curl -X POST -H "Authorization: Bearer $MCPGATEWAY_BEARER_TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"jsonrpc":"2.0","id":2,"method":"search","params":{"query":"my email is john@example.com"}}' \ + http://localhost:8000/ + ``` + + **Rate Limit Event (call rapidly):** + ```bash + for i in {1..10}; do + curl -X POST -H "Authorization: Bearer $MCPGATEWAY_BEARER_TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"jsonrpc":"2.0","id":'$i',"method":"test_tool","params":{"query":"test"}}' \ + http://localhost:8000/ & + done + wait + ``` + +## 4. External Service Testing + +### Testing with Webhook.site +1. Visit https://webhook.site +2. Copy your unique URL +3. Update config with the URL: + ```yaml + webhooks: + - url: "https://webhook.site/YOUR-UNIQUE-ID" + events: ["violation", "tool_success"] + enabled: true + ``` + +### Testing with Slack +1. Create a Slack webhook in your workspace +2. Set environment variable: + ```bash + export SLACK_WEBHOOK_URL="https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK" + ``` +3. Enable Slack webhook in config: + ```yaml + webhooks: + - url: "${env.SLACK_WEBHOOK_URL}" + events: ["violation", "pii_detected"] + enabled: true + ``` + +## 5. Performance Testing + +### Load Testing Webhook Delivery +```python +# Test concurrent webhook delivery +import asyncio +import time +from plugins.webhook_notification.webhook_notification import WebhookNotificationPlugin + +async def test_concurrent_webhooks(): + plugin = create_test_plugin() + + # Send 100 concurrent notifications + tasks = [] + start_time = time.time() + + for i in range(100): + context = create_test_context(f"user-{i}") + task = plugin._notify_webhooks(EventType.TOOL_SUCCESS, context) + tasks.append(task) + + await asyncio.gather(*tasks) + + end_time = time.time() + print(f"Sent 100 webhooks in {end_time - start_time:.2f} seconds") + +# Run the test +asyncio.run(test_concurrent_webhooks()) +``` + +### Memory Usage Testing +```bash +# Monitor memory usage during webhook delivery +pip install memory_profiler + +# Profile webhook plugin +python -m memory_profiler test_webhook_memory.py +``` + +## 6. Error Scenario Testing + +### Testing Network Failures +```python +# Mock network timeouts +mock_client.post.side_effect = asyncio.TimeoutError("Request timed out") + +# Mock connection errors +mock_client.post.side_effect = httpx.ConnectError("Connection failed") + +# Mock HTTP errors +mock_response.status_code = 503 +mock_client.post.return_value = mock_response +``` + +### Testing Authentication Failures +```bash +# Test with invalid bearer token +curl -X POST -H "Authorization: Bearer invalid-token" \ + -H "Content-Type: application/json" \ + -d '{"jsonrpc":"2.0","id":1,"method":"test","params":{}}' \ + http://localhost:8000/ +``` + +## 7. Security Testing + +### Testing HMAC Signatures +```python +import hmac +import hashlib + +def verify_webhook_signature(payload, signature, secret): + expected = hmac.new( + secret.encode('utf-8'), + payload.encode('utf-8'), + hashlib.sha256 + ).hexdigest() + return f"sha256={expected}" == signature + +# Test signature verification in your webhook receiver +``` + +### Testing Credential Security +- ✅ Verify tokens are not logged +- ✅ Verify environment variables are used for secrets +- ✅ Test with expired/invalid credentials + +## 8. Debugging Tips + +### Enable Debug Logging +```bash +export LOG_LEVEL=DEBUG +make dev +``` + +### View Webhook Plugin Logs +```bash +# Filter for webhook plugin logs +tail -f logs/mcpgateway.log | grep -i webhook + +# Or use structured logging +export MCPGATEWAY_LOG_FORMAT=json +``` + +### Common Issues and Solutions + +**Issue**: Webhooks not being sent +- Check `enabled: true` in webhook config +- Verify events match what's being triggered +- Check plugin priority and hooks configuration + +**Issue**: Authentication failures +- Verify environment variables are set correctly +- Check token/key format and headers +- Test authentication separately + +**Issue**: Template rendering errors +- Validate JSON syntax in templates +- Check variable names match available context +- Test templates with sample data + +**Issue**: Network timeouts +- Increase timeout values in webhook config +- Check network connectivity to webhook URLs +- Verify firewall and proxy settings + +## 9. CI/CD Integration + +### GitHub Actions Test Configuration +```yaml +name: Webhook Plugin Tests +on: [push, pull_request] + +jobs: + test-webhook-plugin: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-python@v4 + with: + python-version: '3.11' + + - name: Install dependencies + run: | + make venv install-dev + + - name: Run webhook plugin tests + run: | + pytest tests/unit/mcpgateway/plugins/plugins/webhook_notification/ -v --cov + + - name: Run integration tests + run: | + pytest tests/unit/mcpgateway/plugins/plugins/webhook_notification/test_webhook_integration.py -v +``` + +### Test Coverage Requirements +- Unit tests: > 95% line coverage +- Integration tests: All major user scenarios +- Error handling: All exception paths covered + +## 10. Test Data and Fixtures + +### Sample Test Payloads +```json +{ + "tool_success": { + "event": "tool_success", + "tool_name": "search", + "user": "test@example.com", + "timestamp": "2025-01-15T10:30:45.123Z" + }, + "violation": { + "event": "violation", + "violation": { + "reason": "Rate limit exceeded", + "code": "RATE_LIMIT" + } + } +} +``` + +This comprehensive testing approach ensures the Webhook Notification Plugin is robust, reliable, and ready for production use. \ No newline at end of file diff --git a/plugins/webhook_notification/__init__.py b/plugins/webhook_notification/__init__.py new file mode 100644 index 000000000..896b30b9e --- /dev/null +++ b/plugins/webhook_notification/__init__.py @@ -0,0 +1,11 @@ +# -*- coding: utf-8 -*- +"""Webhook Notification Plugin for MCP Gateway. + +This package provides webhook notification capabilities for the MCP Gateway, +allowing administrators to receive HTTP notifications on various events, +violations, and state changes. +""" + +from .webhook_notification import WebhookNotificationPlugin + +__all__ = ["WebhookNotificationPlugin"] \ No newline at end of file diff --git a/plugins/webhook_notification/plugin-manifest.yaml b/plugins/webhook_notification/plugin-manifest.yaml new file mode 100644 index 000000000..71a1a8901 --- /dev/null +++ b/plugins/webhook_notification/plugin-manifest.yaml @@ -0,0 +1,27 @@ +description: "Send HTTP webhook notifications on events, violations, and state changes" +author: "Claude Code Assistant" +version: "1.0.0" +available_hooks: + - "prompt_pre_fetch" + - "prompt_post_fetch" + - "tool_pre_invoke" + - "tool_post_invoke" + - "resource_pre_fetch" + - "resource_post_fetch" +default_configs: + webhooks: [] + payload_templates: {} + default_template: | + { + "event": "{{event}}", + "plugin": "{{plugin_name}}", + "timestamp": "{{timestamp}}", + "request_id": "{{request_id}}", + "user": "{{user}}", + "tenant_id": "{{tenant_id}}", + "server_id": "{{server_id}}", + "violation": {{violation}}, + "metadata": {{metadata}} + } + include_payload_data: false + max_payload_size: 1000 \ No newline at end of file diff --git a/plugins/webhook_notification/test_config.yaml b/plugins/webhook_notification/test_config.yaml new file mode 100644 index 000000000..c2a729557 --- /dev/null +++ b/plugins/webhook_notification/test_config.yaml @@ -0,0 +1,144 @@ +# Test configuration for Webhook Notification Plugin +# This file can be used for testing the webhook plugin in development + +plugins: + # PII Filter Plugin (will generate violations for testing) + - name: "PIIFilter" + kind: "plugins.pii_filter.pii_filter.PIIFilterPlugin" + hooks: ["tool_pre_invoke", "tool_post_invoke"] + mode: "permissive" # Don't block, just detect + priority: 100 + config: + detect_email: true + detect_phone: true + detect_ssn: true + default_mask_strategy: "partial" + block_on_detection: false + log_detections: true + + # Rate Limiter Plugin (will generate rate limit violations) + - name: "RateLimiter" + kind: "plugins.rate_limiter.rate_limiter.RateLimiterPlugin" + hooks: ["tool_pre_invoke"] + mode: "permissive" # Don't block for testing + priority: 200 + config: + by_user: "5/m" # Low limit for easy testing + by_tenant: "50/m" + + # Webhook Notification Plugin (main plugin being tested) + - name: "WebhookNotification" + kind: "plugins.webhook_notification.webhook_notification.WebhookNotificationPlugin" + hooks: ["tool_pre_invoke", "tool_post_invoke", "prompt_post_fetch", "resource_post_fetch"] + mode: "permissive" + priority: 900 # Run after other plugins to capture their violations + config: + webhooks: + # Webhook for testing with ngrok or local server + - url: "http://localhost:3000/webhook" + events: ["violation", "tool_success", "tool_error", "rate_limit_exceeded", "pii_detected"] + authentication: + type: "none" + retry_attempts: 2 + retry_delay: 1000 + timeout: 10 + enabled: true + + # Webhook.site for easy testing (replace with your URL) + - url: "https://webhook.site/your-unique-url" + events: ["violation", "rate_limit_exceeded"] + authentication: + type: "bearer" + token: "test-token-123" + retry_attempts: 1 + retry_delay: 500 + timeout: 5 + enabled: false # Disabled by default - enable with your URL + + # Example Slack webhook (replace with your webhook URL) + - url: "${env.SLACK_WEBHOOK_URL}" + events: ["violation", "pii_detected", "harmful_content"] + authentication: + type: "none" + retry_attempts: 3 + retry_delay: 2000 + timeout: 15 + enabled: false # Disabled by default - enable with env var + + # Custom payload templates for different notification channels + payload_templates: + violation: | + { + "text": "🚨 MCP Gateway Violation Alert", + "blocks": [ + { + "type": "section", + "text": { + "type": "mrkdwn", + "text": "*Violation:* {{violation.reason}}\n*Description:* {{violation.description}}\n*Code:* {{violation.code}}" + } + }, + { + "type": "section", + "fields": [ + { + "type": "mrkdwn", + "text": "*User:* {{user}}" + }, + { + "type": "mrkdwn", + "text": "*Request ID:* {{request_id}}" + }, + { + "type": "mrkdwn", + "text": "*Timestamp:* {{timestamp}}" + }, + { + "type": "mrkdwn", + "text": "*Plugin:* {{plugin_name}}" + } + ] + } + ] + } + + rate_limit_exceeded: | + { + "alert": "Rate Limit Exceeded", + "severity": "warning", + "message": "User {{user}} has exceeded their rate limit", + "timestamp": "{{timestamp}}", + "request_id": "{{request_id}}", + "details": {{violation.details}}, + "reset_info": "Rate limit will reset in {{violation.details.reset_in}} seconds" + } + + pii_detected: | + { + "alert": "PII Detection", + "severity": "high", + "message": "Personally identifiable information detected and masked", + "user": "{{user}}", + "timestamp": "{{timestamp}}", + "request_id": "{{request_id}}", + "detection_details": "PII was found and automatically processed according to policy" + } + + tool_success: | + { + "event": "tool_execution_success", + "tool_name": "{{metadata.tool_name}}", + "user": "{{user}}", + "timestamp": "{{timestamp}}", + "request_id": "{{request_id}}" + } + + # Include payload data for debugging (be careful in production) + include_payload_data: true + max_payload_size: 2000 + +plugin_settings: + plugin_timeout: 30 + fail_on_plugin_error: false + +plugin_dirs: [] \ No newline at end of file diff --git a/plugins/webhook_notification/webhook_notification.py b/plugins/webhook_notification/webhook_notification.py new file mode 100644 index 000000000..01c6e5fc1 --- /dev/null +++ b/plugins/webhook_notification/webhook_notification.py @@ -0,0 +1,316 @@ +# -*- coding: utf-8 -*- +"""Location: ./plugins/webhook_notification/webhook_notification.py +Copyright 2025 +SPDX-License-Identifier: Apache-2.0 + +Webhook Notification Plugin. +Sends HTTP webhook notifications on specific events, violations, or state changes. +Supports multiple webhooks, event filtering, retry logic, and authentication. +""" + +from __future__ import annotations + +# Standard +import asyncio +import hashlib +import hmac +import json +import logging +import time +from datetime import datetime, timezone +from enum import Enum +from typing import Any, Dict, List, Optional, Union + +# Third-Party +import httpx +from pydantic import BaseModel, Field + +# First-Party +from mcpgateway.plugins.framework import ( + Plugin, + PluginConfig, + PluginContext, + PluginViolation, + PromptPrehookPayload, + PromptPrehookResult, + PromptPosthookPayload, + PromptPosthookResult, + ResourcePreFetchPayload, + ResourcePreFetchResult, + ResourcePostFetchPayload, + ResourcePostFetchResult, + ToolPreInvokePayload, + ToolPreInvokeResult, + ToolPostInvokePayload, + ToolPostInvokeResult, +) + +logger = logging.getLogger(__name__) + + +class EventType(str, Enum): + """Event types for webhook notifications.""" + VIOLATION = "violation" + RATE_LIMIT_EXCEEDED = "rate_limit_exceeded" + PII_DETECTED = "pii_detected" + HARMFUL_CONTENT = "harmful_content" + TOOL_SUCCESS = "tool_success" + TOOL_ERROR = "tool_error" + PROMPT_SUCCESS = "prompt_success" + RESOURCE_SUCCESS = "resource_success" + PLUGIN_ERROR = "plugin_error" + + +class AuthenticationType(str, Enum): + """Authentication types for webhook requests.""" + NONE = "none" + BEARER = "bearer" + API_KEY = "api_key" + HMAC = "hmac" + + +class AuthenticationConfig(BaseModel): + """Authentication configuration for webhooks.""" + type: AuthenticationType = AuthenticationType.NONE + token: Optional[str] = None + api_key: Optional[str] = None + api_key_header: str = "X-API-Key" + hmac_secret: Optional[str] = None + hmac_algorithm: str = "sha256" + hmac_header: str = "X-Signature" + + +class WebhookConfig(BaseModel): + """Configuration for a single webhook endpoint.""" + url: str = Field(description="Webhook URL") + events: List[EventType] = Field(default_factory=lambda: [EventType.VIOLATION]) + authentication: AuthenticationConfig = Field(default_factory=AuthenticationConfig) + retry_attempts: int = Field(default=3, ge=0, le=10) + retry_delay: int = Field(default=1000, ge=100, le=60000, description="Delay in milliseconds") + timeout: int = Field(default=10, ge=1, le=120, description="Request timeout in seconds") + enabled: bool = True + + +class WebhookNotificationConfig(BaseModel): + """Configuration for the webhook notification plugin.""" + webhooks: List[WebhookConfig] = Field(default_factory=list) + payload_templates: Dict[str, str] = Field(default_factory=dict) + default_template: str = Field(default="""{ + "event": "{{event}}", + "plugin": "{{plugin_name}}", + "timestamp": "{{timestamp}}", + "request_id": "{{request_id}}", + "user": "{{user}}", + "tenant_id": "{{tenant_id}}", + "server_id": "{{server_id}}", + "violation": {{violation}}, + "metadata": {{metadata}} +}""") + include_payload_data: bool = Field(default=False, description="Include request payload in notifications") + max_payload_size: int = Field(default=1000, description="Max payload size to include in notifications") + + +class WebhookNotificationPlugin(Plugin): + """Plugin for sending webhook notifications on events and violations.""" + + def __init__(self, config: PluginConfig) -> None: + super().__init__(config) + self._cfg = WebhookNotificationConfig(**(config.config or {})) + self._client = httpx.AsyncClient() + + async def _render_template(self, template: str, context: Dict[str, Any]) -> str: + """Render a Jinja2-style template with the given context.""" + # Simple template substitution for now - could be enhanced with Jinja2 + result = template + for key, value in context.items(): + placeholder = f"{{{{{key}}}}}" + if isinstance(value, (dict, list)): + result = result.replace(placeholder, json.dumps(value)) + else: + result = result.replace(placeholder, str(value or "")) + return result + + def _create_hmac_signature(self, payload: str, secret: str, algorithm: str) -> str: + """Create HMAC signature for the payload.""" + hash_func = getattr(hashlib, algorithm, hashlib.sha256) + signature = hmac.new( + secret.encode('utf-8'), + payload.encode('utf-8'), + hash_func + ).hexdigest() + return f"{algorithm}={signature}" + + async def _send_webhook( + self, + webhook: WebhookConfig, + event: EventType, + context: PluginContext, + violation: Optional[PluginViolation] = None, + metadata: Optional[Dict[str, Any]] = None, + payload_data: Optional[Dict[str, Any]] = None + ) -> None: + """Send a webhook notification with retry logic.""" + if not webhook.enabled or event not in webhook.events: + return + + # Prepare context for template rendering + template_context = { + "event": event.value, + "plugin_name": self.name, + "timestamp": datetime.now(timezone.utc).isoformat(), + "request_id": context.global_context.request_id, + "user": context.global_context.user, + "tenant_id": context.global_context.tenant_id, + "server_id": context.global_context.server_id, + "violation": violation.dict() if violation else None, + "metadata": metadata or {}, + } + + # Add payload data if enabled and size is reasonable + if self._cfg.include_payload_data and payload_data: + payload_str = json.dumps(payload_data) + if len(payload_str) <= self._cfg.max_payload_size: + template_context["payload"] = payload_data + + # Select template + template = self._cfg.payload_templates.get(event.value, self._cfg.default_template) + + try: + payload_json = await self._render_template(template, template_context) + payload_bytes = payload_json.encode('utf-8') + except Exception as e: + logger.error(f"Failed to render webhook template for {event.value}: {e}") + return + + # Prepare headers + headers = { + "Content-Type": "application/json", + "User-Agent": "MCP-Gateway-Webhook-Plugin/1.0" + } + + # Add authentication + auth_config = webhook.authentication + if auth_config.type == AuthenticationType.BEARER and auth_config.token: + headers["Authorization"] = f"Bearer {auth_config.token}" + elif auth_config.type == AuthenticationType.API_KEY and auth_config.api_key: + headers[auth_config.api_key_header] = auth_config.api_key + elif auth_config.type == AuthenticationType.HMAC and auth_config.hmac_secret: + signature = self._create_hmac_signature(payload_json, auth_config.hmac_secret, auth_config.hmac_algorithm) + headers[auth_config.hmac_header] = signature + + # Attempt delivery with retry logic + for attempt in range(webhook.retry_attempts + 1): + try: + response = await self._client.post( + webhook.url, + content=payload_bytes, + headers=headers, + timeout=webhook.timeout + ) + + if 200 <= response.status_code < 300: + logger.debug(f"Webhook delivered successfully to {webhook.url} on attempt {attempt + 1}") + return + else: + logger.warning(f"Webhook delivery failed with status {response.status_code} to {webhook.url}") + + except Exception as e: + logger.warning(f"Webhook delivery attempt {attempt + 1} failed to {webhook.url}: {e}") + + # Don't sleep after the last attempt + if attempt < webhook.retry_attempts: + delay_seconds = webhook.retry_delay / 1000.0 * (2 ** attempt) # Exponential backoff + await asyncio.sleep(delay_seconds) + + logger.error(f"All webhook delivery attempts failed for {webhook.url}") + + async def _notify_webhooks( + self, + event: EventType, + context: PluginContext, + violation: Optional[PluginViolation] = None, + metadata: Optional[Dict[str, Any]] = None, + payload_data: Optional[Dict[str, Any]] = None + ) -> None: + """Send notifications to all configured webhooks.""" + if not self._cfg.webhooks: + return + + # Send webhooks concurrently + tasks = [ + self._send_webhook(webhook, event, context, violation, metadata, payload_data) + for webhook in self._cfg.webhooks + ] + + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + + def _determine_event_type(self, violation: Optional[PluginViolation]) -> EventType: + """Determine event type based on violation details.""" + if not violation: + return EventType.TOOL_SUCCESS + + if violation.code == "RATE_LIMIT": + return EventType.RATE_LIMIT_EXCEEDED + elif "pii" in violation.reason.lower(): + return EventType.PII_DETECTED + elif "harmful" in violation.reason.lower() or "content" in violation.reason.lower(): + return EventType.HARMFUL_CONTENT + else: + return EventType.VIOLATION + + async def prompt_pre_fetch(self, payload: PromptPrehookPayload, context: PluginContext) -> PromptPrehookResult: + """Hook for prompt pre-fetch events.""" + return PromptPrehookResult() + + async def prompt_post_fetch(self, payload: PromptPosthookPayload, context: PluginContext) -> PromptPosthookResult: + """Hook for prompt post-fetch events.""" + await self._notify_webhooks( + EventType.PROMPT_SUCCESS, + context, + metadata={"prompt_name": payload.name} + ) + return PromptPosthookResult() + + async def tool_pre_invoke(self, payload: ToolPreInvokePayload, context: PluginContext) -> ToolPreInvokeResult: + """Hook for tool pre-invoke events.""" + return ToolPreInvokeResult() + + async def tool_post_invoke(self, payload: ToolPostInvokePayload, context: PluginContext) -> ToolPostInvokeResult: + """Hook for tool post-invoke events.""" + # Check if there was an error in the result + event = EventType.TOOL_SUCCESS + metadata = {"tool_name": payload.name} + + if hasattr(payload.result, 'error') and payload.result.error: + event = EventType.TOOL_ERROR + metadata["error"] = str(payload.result.error) + + payload_data = None + if self._cfg.include_payload_data: + payload_data = {"tool_name": payload.name, "args": payload.result} + + await self._notify_webhooks(event, context, metadata=metadata, payload_data=payload_data) + return ToolPostInvokeResult() + + async def resource_pre_fetch(self, payload: ResourcePreFetchPayload, context: PluginContext) -> ResourcePreFetchResult: + """Hook for resource pre-fetch events.""" + return ResourcePreFetchResult() + + async def resource_post_fetch(self, payload: ResourcePostFetchPayload, context: PluginContext) -> ResourcePostFetchResult: + """Hook for resource post-fetch events.""" + await self._notify_webhooks( + EventType.RESOURCE_SUCCESS, + context, + metadata={"resource_uri": payload.uri} + ) + return ResourcePostFetchResult() + + async def __aenter__(self): + """Async context manager entry.""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit - cleanup HTTP client.""" + if hasattr(self, '_client'): + await self._client.aclose() \ No newline at end of file diff --git a/tests/unit/mcpgateway/plugins/plugins/webhook_notification/test_webhook_integration.py b/tests/unit/mcpgateway/plugins/plugins/webhook_notification/test_webhook_integration.py new file mode 100644 index 000000000..699200db1 --- /dev/null +++ b/tests/unit/mcpgateway/plugins/plugins/webhook_notification/test_webhook_integration.py @@ -0,0 +1,359 @@ +# -*- coding: utf-8 -*- +"""Location: ./tests/unit/mcpgateway/plugins/plugins/webhook_notification/test_webhook_integration.py +Copyright 2025 +SPDX-License-Identifier: Apache-2.0 + +Integration tests for WebhookNotificationPlugin with PluginManager. +""" + +import json +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from mcpgateway.plugins.framework.manager import PluginManager +from mcpgateway.plugins.framework.models import ( + GlobalContext, + ToolPostInvokePayload, + PluginViolation, +) + + +@pytest.mark.asyncio +async def test_webhook_plugin_with_manager(): + """Test webhook plugin integration with PluginManager.""" + with tempfile.TemporaryDirectory() as tmp_dir: + # Create test config file + config_content = """ +plugins: + - name: "WebhookNotification" + kind: "plugins.webhook_notification.webhook_notification.WebhookNotificationPlugin" + hooks: ["tool_post_invoke"] + mode: "permissive" + priority: 900 + config: + webhooks: + - url: "https://test.example.com/webhook" + events: ["tool_success", "tool_error"] + authentication: + type: "bearer" + token: "test-token" + retry_attempts: 1 + timeout: 5 + enabled: true + include_payload_data: false + +plugin_settings: + plugin_timeout: 10 + fail_on_plugin_error: false + +plugin_dirs: [] +""" + config_path = Path(tmp_dir) / "test_config.yaml" + config_path.write_text(config_content) + + # Mock HTTP client for webhook delivery + with patch('plugins.webhook_notification.webhook_notification.httpx.AsyncClient') as mock_client_class: + mock_client = AsyncMock() + mock_response = MagicMock() + mock_response.status_code = 200 + mock_client.post.return_value = mock_response + mock_client_class.return_value = mock_client + + # Initialize plugin manager + manager = PluginManager(str(config_path), timeout=10) + await manager.initialize() + + try: + # Create test context and payload + context = GlobalContext( + request_id="test-req-123", + user="testuser@example.com", + tenant_id="test-tenant", + server_id="test-server" + ) + + payload = ToolPostInvokePayload( + name="search_tool", + result={"status": "success", "results": ["item1", "item2"]} + ) + + # Execute tool post-invoke hook + result, final_context = await manager.tool_post_invoke(payload, context) + + # Verify result + assert result.continue_processing is True + assert result.violation is None + + # Verify webhook was called + mock_client.post.assert_called_once() + + # Verify the webhook call details + call_args = mock_client.post.call_args + assert call_args[0][0] == "https://test.example.com/webhook" # URL + assert call_args[1]["timeout"] == 5 + + # Verify headers + headers = call_args[1]["headers"] + assert headers["Authorization"] == "Bearer test-token" + assert headers["Content-Type"] == "application/json" + + # Verify payload structure + payload_data = json.loads(call_args[1]["content"]) + assert payload_data["event"] == "tool_success" + assert payload_data["request_id"] == "test-req-123" + assert payload_data["user"] == "testuser@example.com" + assert payload_data["metadata"]["tool_name"] == "search_tool" + + finally: + await manager.shutdown() + + +@pytest.mark.asyncio +async def test_webhook_plugin_violation_handling(): + """Test webhook plugin handles violations from other plugins.""" + with tempfile.TemporaryDirectory() as tmp_dir: + # Create config with webhook plugin and a deny filter that will create violations + config_content = """ +plugins: + - name: "DenyFilter" + kind: "plugins.deny_filter.deny.DenyListPlugin" + hooks: ["tool_pre_invoke"] + mode: "enforce" + priority: 100 + config: + words: ["forbidden"] + + - name: "WebhookNotification" + kind: "plugins.webhook_notification.webhook_notification.WebhookNotificationPlugin" + hooks: ["tool_pre_invoke"] + mode: "permissive" + priority: 900 + config: + webhooks: + - url: "https://violations.example.com/webhook" + events: ["violation"] + authentication: + type: "none" + retry_attempts: 1 + enabled: true + +plugin_settings: + plugin_timeout: 10 + fail_on_plugin_error: false + +plugin_dirs: [] +""" + config_path = Path(tmp_dir) / "test_config.yaml" + config_path.write_text(config_content) + + # Mock HTTP client + with patch('plugins.webhook_notification.webhook_notification.httpx.AsyncClient') as mock_client_class: + mock_client = AsyncMock() + mock_response = MagicMock() + mock_response.status_code = 200 + mock_client.post.return_value = mock_response + mock_client_class.return_value = mock_client + + manager = PluginManager(str(config_path), timeout=10) + await manager.initialize() + + try: + context = GlobalContext(request_id="violation-test", user="testuser") + + # Create payload with forbidden word that will trigger deny filter + from mcpgateway.plugins.framework.models import ToolPreInvokePayload + payload = ToolPreInvokePayload( + name="test_tool", + args={"query": "this contains forbidden word"} + ) + + # Execute - should be blocked by deny filter + result, final_context = await manager.tool_pre_invoke(payload, context) + + # Verify the request was blocked + assert result.continue_processing is False + assert result.violation is not None + + # Note: In this test, the webhook plugin runs but may not send a webhook + # because the violation occurred in another plugin, not the webhook plugin itself + # The webhook plugin primarily sends notifications for successful operations + # and its own violations, not violations from other plugins in the same hook + + finally: + await manager.shutdown() + + +@pytest.mark.asyncio +async def test_webhook_plugin_multiple_webhooks(): + """Test webhook plugin with multiple configured webhooks.""" + with tempfile.TemporaryDirectory() as tmp_dir: + config_content = """ +plugins: + - name: "WebhookNotification" + kind: "plugins.webhook_notification.webhook_notification.WebhookNotificationPlugin" + hooks: ["tool_post_invoke"] + mode: "permissive" + priority: 500 + config: + webhooks: + - url: "https://slack.example.com/webhook" + events: ["tool_success", "violation"] + authentication: + type: "bearer" + token: "slack-token" + retry_attempts: 2 + enabled: true + + - url: "https://monitoring.example.com/webhook" + events: ["tool_success", "tool_error"] + authentication: + type: "api_key" + api_key: "monitor-key" + api_key_header: "X-Monitor-Key" + retry_attempts: 1 + enabled: true + + - url: "https://disabled.example.com/webhook" + events: ["tool_success"] + enabled: false # This one is disabled + +plugin_settings: + plugin_timeout: 15 + fail_on_plugin_error: false + +plugin_dirs: [] +""" + config_path = Path(tmp_dir) / "test_config.yaml" + config_path.write_text(config_content) + + with patch('plugins.webhook_notification.webhook_notification.httpx.AsyncClient') as mock_client_class: + mock_client = AsyncMock() + mock_response = MagicMock() + mock_response.status_code = 200 + mock_client.post.return_value = mock_response + mock_client_class.return_value = mock_client + + manager = PluginManager(str(config_path), timeout=15) + await manager.initialize() + + try: + context = GlobalContext(request_id="multi-webhook-test", user="testuser") + + payload = ToolPostInvokePayload( + name="analytics_tool", + result={"processed": 100, "errors": 0} + ) + + # Execute hook + result, final_context = await manager.tool_post_invoke(payload, context) + + assert result.continue_processing is True + + # Verify two webhooks were called (not the disabled one) + assert mock_client.post.call_count == 2 + + # Check that different authentication methods were used + call_args_list = mock_client.post.call_args_list + + # Find Slack webhook call + slack_call = None + monitor_call = None + + for call in call_args_list: + url = call[0][0] + headers = call[1]["headers"] + + if "slack.example.com" in url: + slack_call = call + assert headers["Authorization"] == "Bearer slack-token" + elif "monitoring.example.com" in url: + monitor_call = call + assert headers["X-Monitor-Key"] == "monitor-key" + + assert slack_call is not None, "Slack webhook was not called" + assert monitor_call is not None, "Monitoring webhook was not called" + + # Verify disabled webhook was not called + for call in call_args_list: + url = call[0][0] + assert "disabled.example.com" not in url + + finally: + await manager.shutdown() + + +@pytest.mark.asyncio +async def test_webhook_plugin_template_customization(): + """Test webhook plugin with custom payload templates.""" + with tempfile.TemporaryDirectory() as tmp_dir: + config_content = """ +plugins: + - name: "WebhookNotification" + kind: "plugins.webhook_notification.webhook_notification.WebhookNotificationPlugin" + hooks: ["tool_post_invoke"] + mode: "permissive" + priority: 500 + config: + webhooks: + - url: "https://custom.example.com/webhook" + events: ["tool_success"] + authentication: + type: "none" + retry_attempts: 1 + enabled: true + payload_templates: + tool_success: | + { + "alert_type": "success", + "service": "mcp-gateway", + "tool": "{{metadata.tool_name}}", + "user": "{{user}}", + "timestamp": "{{timestamp}}" + } + +plugin_settings: + plugin_timeout: 10 + fail_on_plugin_error: false + +plugin_dirs: [] +""" + config_path = Path(tmp_dir) / "test_config.yaml" + config_path.write_text(config_content) + + with patch('plugins.webhook_notification.webhook_notification.httpx.AsyncClient') as mock_client_class: + mock_client = AsyncMock() + mock_response = MagicMock() + mock_response.status_code = 200 + mock_client.post.return_value = mock_response + mock_client_class.return_value = mock_client + + manager = PluginManager(str(config_path), timeout=10) + await manager.initialize() + + try: + context = GlobalContext(request_id="template-test", user="template_user") + + payload = ToolPostInvokePayload( + name="custom_tool", + result={"data": "test"} + ) + + await manager.tool_post_invoke(payload, context) + + # Verify webhook was called with custom template + mock_client.post.assert_called_once() + call_args = mock_client.post.call_args + + payload_data = json.loads(call_args[1]["content"]) + + # Check custom template fields + assert payload_data["alert_type"] == "success" + assert payload_data["service"] == "mcp-gateway" + assert "custom_tool" in str(payload_data) # Tool name should be in metadata + assert payload_data["user"] == "template_user" + + finally: + await manager.shutdown() \ No newline at end of file diff --git a/tests/unit/mcpgateway/plugins/plugins/webhook_notification/test_webhook_notification.py b/tests/unit/mcpgateway/plugins/plugins/webhook_notification/test_webhook_notification.py new file mode 100644 index 000000000..7151aadc3 --- /dev/null +++ b/tests/unit/mcpgateway/plugins/plugins/webhook_notification/test_webhook_notification.py @@ -0,0 +1,475 @@ +# -*- coding: utf-8 -*- +"""Location: ./tests/unit/mcpgateway/plugins/plugins/webhook_notification/test_webhook_notification.py +Copyright 2025 +SPDX-License-Identifier: Apache-2.0 + +Tests for WebhookNotificationPlugin. +""" + +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from mcpgateway.plugins.framework.models import ( + GlobalContext, + HookType, + PluginConfig, + PluginContext, + PluginViolation, + PromptPrehookPayload, + ToolPostInvokePayload, + ToolPreInvokePayload, +) +from plugins.webhook_notification.webhook_notification import ( + AuthenticationType, + EventType, + WebhookNotificationPlugin, +) + + +def _create_plugin(config_dict=None) -> WebhookNotificationPlugin: + """Helper to create webhook notification plugin with test config.""" + default_config = { + "webhooks": [ + { + "url": "https://hooks.example.com/webhook", + "events": ["violation", "tool_success"], + "authentication": {"type": "none"}, + "retry_attempts": 1, + "retry_delay": 100, + "timeout": 5, + "enabled": True, + } + ], + "include_payload_data": False, + "max_payload_size": 1000, + } + + if config_dict: + default_config.update(config_dict) + + return WebhookNotificationPlugin( + PluginConfig( + name="webhook_test", + kind="plugins.webhook_notification.webhook_notification.WebhookNotificationPlugin", + hooks=[HookType.TOOL_POST_INVOKE], + config=default_config, + ) + ) + + +def _create_context(user="testuser", request_id="req-123") -> PluginContext: + """Helper to create plugin context.""" + return PluginContext( + global_context=GlobalContext( + request_id=request_id, + user=user, + tenant_id="tenant-abc", + server_id="server-xyz" + ) + ) + + +class TestWebhookNotificationPlugin: + """Test cases for WebhookNotificationPlugin.""" + + @pytest.mark.asyncio + async def test_plugin_initialization(self): + """Test plugin initializes correctly with default config.""" + plugin = _create_plugin() + assert plugin.name == "webhook_test" + assert len(plugin._cfg.webhooks) == 1 + assert plugin._cfg.webhooks[0].url == "https://hooks.example.com/webhook" + + @pytest.mark.asyncio + async def test_template_rendering(self): + """Test template rendering with context variables.""" + plugin = _create_plugin() + + template = '{"event": "{{event}}", "user": "{{user}}", "timestamp": "{{timestamp}}"}' + context_vars = { + "event": "test_event", + "user": "testuser", + "timestamp": "2025-01-15T10:30:45.123Z" + } + + result = await plugin._render_template(template, context_vars) + parsed = json.loads(result) + + assert parsed["event"] == "test_event" + assert parsed["user"] == "testuser" + assert parsed["timestamp"] == "2025-01-15T10:30:45.123Z" + + @pytest.mark.asyncio + async def test_hmac_signature_creation(self): + """Test HMAC signature creation.""" + plugin = _create_plugin() + + payload = '{"test": "data"}' + secret = "test-secret" + algorithm = "sha256" + + signature = plugin._create_hmac_signature(payload, secret, algorithm) + + assert signature.startswith("sha256=") + assert len(signature) > 10 # Basic length check + + @pytest.mark.asyncio + @patch('plugins.webhook_notification.webhook_notification.httpx.AsyncClient') + async def test_webhook_delivery_success(self, mock_client_class): + """Test successful webhook delivery.""" + # Setup mocks + mock_client = AsyncMock() + mock_response = MagicMock() + mock_response.status_code = 200 + mock_client.post.return_value = mock_response + mock_client_class.return_value = mock_client + + plugin = _create_plugin() + plugin._client = mock_client + + context = _create_context() + webhook = plugin._cfg.webhooks[0] + + await plugin._send_webhook( + webhook=webhook, + event=EventType.TOOL_SUCCESS, + context=context, + metadata={"tool_name": "test_tool"} + ) + + # Verify webhook was called + mock_client.post.assert_called_once() + call_args = mock_client.post.call_args + + assert call_args[1]["timeout"] == 5 + assert "Content-Type" in call_args[1]["headers"] + assert call_args[1]["headers"]["Content-Type"] == "application/json" + + @pytest.mark.asyncio + @patch('plugins.webhook_notification.webhook_notification.httpx.AsyncClient') + async def test_webhook_delivery_with_bearer_auth(self, mock_client_class): + """Test webhook delivery with bearer token authentication.""" + mock_client = AsyncMock() + mock_response = MagicMock() + mock_response.status_code = 200 + mock_client.post.return_value = mock_response + mock_client_class.return_value = mock_client + + config = { + "webhooks": [ + { + "url": "https://hooks.example.com/webhook", + "events": ["tool_success"], + "authentication": { + "type": "bearer", + "token": "test-token-123" + }, + "retry_attempts": 1, + "enabled": True, + } + ] + } + + plugin = _create_plugin(config) + plugin._client = mock_client + + context = _create_context() + webhook = plugin._cfg.webhooks[0] + + await plugin._send_webhook( + webhook=webhook, + event=EventType.TOOL_SUCCESS, + context=context + ) + + # Verify Authorization header was set + call_args = mock_client.post.call_args + headers = call_args[1]["headers"] + assert "Authorization" in headers + assert headers["Authorization"] == "Bearer test-token-123" + + @pytest.mark.asyncio + @patch('plugins.webhook_notification.webhook_notification.httpx.AsyncClient') + async def test_webhook_delivery_with_api_key_auth(self, mock_client_class): + """Test webhook delivery with API key authentication.""" + mock_client = AsyncMock() + mock_response = MagicMock() + mock_response.status_code = 200 + mock_client.post.return_value = mock_response + mock_client_class.return_value = mock_client + + config = { + "webhooks": [ + { + "url": "https://hooks.example.com/webhook", + "events": ["tool_success"], + "authentication": { + "type": "api_key", + "api_key": "test-api-key", + "api_key_header": "X-API-Key" + }, + "retry_attempts": 1, + "enabled": True, + } + ] + } + + plugin = _create_plugin(config) + plugin._client = mock_client + + context = _create_context() + webhook = plugin._cfg.webhooks[0] + + await plugin._send_webhook( + webhook=webhook, + event=EventType.TOOL_SUCCESS, + context=context + ) + + # Verify API key header was set + call_args = mock_client.post.call_args + headers = call_args[1]["headers"] + assert "X-API-Key" in headers + assert headers["X-API-Key"] == "test-api-key" + + @pytest.mark.asyncio + @patch('plugins.webhook_notification.webhook_notification.httpx.AsyncClient') + async def test_webhook_delivery_retry_on_failure(self, mock_client_class): + """Test webhook retry logic on failure.""" + mock_client = AsyncMock() + mock_response = MagicMock() + mock_response.status_code = 500 # Server error + mock_client.post.return_value = mock_response + mock_client_class.return_value = mock_client + + config = { + "webhooks": [ + { + "url": "https://hooks.example.com/webhook", + "events": ["tool_success"], + "authentication": {"type": "none"}, + "retry_attempts": 3, + "retry_delay": 10, # Small delay for testing + "enabled": True, + } + ] + } + + plugin = _create_plugin(config) + plugin._client = mock_client + + context = _create_context() + webhook = plugin._cfg.webhooks[0] + + await plugin._send_webhook( + webhook=webhook, + event=EventType.TOOL_SUCCESS, + context=context + ) + + # Verify webhook was retried (1 initial + 3 retries = 4 total calls) + assert mock_client.post.call_count == 4 + + @pytest.mark.asyncio + async def test_event_type_determination(self): + """Test event type determination from violations.""" + plugin = _create_plugin() + + # Test rate limit violation + rate_limit_violation = PluginViolation( + reason="Rate limit exceeded", + description="Too many requests", + code="RATE_LIMIT", + details={} + ) + + event_type = plugin._determine_event_type(rate_limit_violation) + assert event_type == EventType.RATE_LIMIT_EXCEEDED + + # Test PII violation + pii_violation = PluginViolation( + reason="PII detected in content", + description="Email found", + code="PII_DETECTED", + details={} + ) + + event_type = plugin._determine_event_type(pii_violation) + assert event_type == EventType.PII_DETECTED + + # Test no violation (success case) + event_type = plugin._determine_event_type(None) + assert event_type == EventType.TOOL_SUCCESS + + @pytest.mark.asyncio + @patch('plugins.webhook_notification.webhook_notification.WebhookNotificationPlugin._notify_webhooks') + async def test_tool_post_invoke_hook(self, mock_notify): + """Test tool_post_invoke hook triggers notification.""" + plugin = _create_plugin() + context = _create_context() + + payload = ToolPostInvokePayload( + name="test_tool", + result={"success": True, "data": "test result"} + ) + + result = await plugin.tool_post_invoke(payload, context) + + # Verify notification was triggered + mock_notify.assert_called_once() + call_args = mock_notify.call_args[0] + assert call_args[0] == EventType.TOOL_SUCCESS # event type + assert call_args[1] == context # context + + @pytest.mark.asyncio + async def test_webhook_filtering_by_event_type(self): + """Test webhooks are filtered by event type.""" + config = { + "webhooks": [ + { + "url": "https://hooks.example.com/webhook1", + "events": ["violation"], # Only violations + "authentication": {"type": "none"}, + "enabled": True, + }, + { + "url": "https://hooks.example.com/webhook2", + "events": ["tool_success"], # Only tool success + "authentication": {"type": "none"}, + "enabled": True, + } + ] + } + + plugin = _create_plugin(config) + + # Mock the send_webhook method to track calls + plugin._send_webhook = AsyncMock() + + context = _create_context() + + # Send a tool success event + await plugin._notify_webhooks( + event=EventType.TOOL_SUCCESS, + context=context + ) + + # Verify only the second webhook (tool_success) was called + assert plugin._send_webhook.call_count == 2 # Both webhooks checked + + # Check the calls - first webhook should not be called (wrong event type) + calls = plugin._send_webhook.call_args_list + + # Both webhooks get checked but filtering happens inside _send_webhook + assert len(calls) == 2 + + @pytest.mark.asyncio + async def test_disabled_webhook_not_called(self): + """Test disabled webhooks are not called.""" + config = { + "webhooks": [ + { + "url": "https://hooks.example.com/webhook", + "events": ["tool_success"], + "authentication": {"type": "none"}, + "enabled": False, # Disabled + } + ] + } + + plugin = _create_plugin(config) + + # Mock HTTP client to verify no calls are made + plugin._client = AsyncMock() + + context = _create_context() + webhook = plugin._cfg.webhooks[0] + + await plugin._send_webhook( + webhook=webhook, + event=EventType.TOOL_SUCCESS, + context=context + ) + + # Verify no HTTP calls were made + plugin._client.post.assert_not_called() + + @pytest.mark.asyncio + async def test_custom_payload_template(self): + """Test custom payload templates are used correctly.""" + custom_template = '{"custom": "{{event}}", "user": "{{user}}"}' + + config = { + "payload_templates": { + "tool_success": custom_template + } + } + + plugin = _create_plugin(config) + + context_vars = { + "event": "tool_success", + "user": "testuser" + } + + result = await plugin._render_template(custom_template, context_vars) + parsed = json.loads(result) + + assert parsed["custom"] == "tool_success" + assert parsed["user"] == "testuser" + + @pytest.mark.asyncio + async def test_payload_size_limiting(self): + """Test payload size limiting functionality.""" + large_payload = {"data": "x" * 2000} # Large payload + + config = { + "include_payload_data": True, + "max_payload_size": 100 # Small limit + } + + plugin = _create_plugin(config) + plugin._client = AsyncMock() + mock_response = MagicMock() + mock_response.status_code = 200 + plugin._client.post.return_value = mock_response + + context = _create_context() + webhook = plugin._cfg.webhooks[0] + + await plugin._send_webhook( + webhook=webhook, + event=EventType.TOOL_SUCCESS, + context=context, + payload_data=large_payload + ) + + # Verify webhook was still sent (payload should be excluded due to size) + plugin._client.post.assert_called_once() + + @pytest.mark.asyncio + async def test_prompt_pre_and_post_hooks_return_success(self): + """Test prompt hooks return success results.""" + plugin = _create_plugin() + context = _create_context() + + # Test pre-hook + pre_payload = PromptPrehookPayload(name="test_prompt", args={}) + pre_result = await plugin.prompt_pre_fetch(pre_payload, context) + assert pre_result.continue_processing is True + + # Test post-hook with mock notification + plugin._notify_webhooks = AsyncMock() + + from mcpgateway.plugins.framework.models import PromptPosthookPayload, PromptResult + post_payload = PromptPosthookPayload( + name="test_prompt", + result=PromptResult(messages=[]) + ) + post_result = await plugin.prompt_post_fetch(post_payload, context) + assert post_result.continue_processing is True + + # Verify notification was sent + plugin._notify_webhooks.assert_called_once() \ No newline at end of file