Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 50 additions & 8 deletions ai_council/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from .utils.config import AICouncilConfig, load_config
from .utils.logging import configure_logging, get_logger
from .factory import AICouncilFactory
from .sanitization import SanitizationFilter


class AICouncil:
Expand Down Expand Up @@ -66,7 +67,17 @@ def __init__(self, config_path: Optional[Path] = None):

# Initialize orchestration layer
self.orchestration_layer: OrchestrationLayer = self.factory.create_orchestration_layer()


# Initialize sanitization filter (runs before prompt construction)
sanitization_config = (
config_path.parent / "sanitization_filters.yaml"
if config_path is not None
else None
)
self.sanitization_filter: SanitizationFilter = SanitizationFilter.from_config(
config_path=sanitization_config
)

self.logger.info("AI Council application initialized successfully")

async def _execute_with_timeout(
Expand Down Expand Up @@ -114,23 +125,54 @@ async def _execute_with_timeout(
)

async def process_request(
self,
user_input: str,
execution_mode: ExecutionMode = ExecutionMode.BALANCED
self,
user_input: str,
execution_mode: ExecutionMode = ExecutionMode.BALANCED,
*,
session_id: str = "anonymous",
) -> FinalResponse:
"""
Process a user request through the AI Council system.


The Sanitization Filter runs FIRST, before any prompt construction
or orchestration. Injection attempts are rejected immediately.

Args:
user_input: The user's request as a string
user_input: The user's request as a string
execution_mode: The execution mode to use (fast, balanced, best_quality)

session_id: Per-session key used for rate-limit tracking.

Returns:
FinalResponse: The final processed response
"""
self.logger.info("Processing request in", extra={"value": execution_mode.value})
self.logger.debug("User input", extra={"user_input": user_input[:200]})


# ── Stage 0: Sanitization Filter ─────────────────────────────────
filter_result = self.sanitization_filter.check(
user_input, source_key=session_id
)
if not filter_result.is_safe:
self.logger.warning(
"Request blocked by SanitizationFilter",
extra={
"session_id": session_id,
"filter": filter_result.filter_name,
"severity": filter_result.severity.value if filter_result.severity else None,
"rule": filter_result.triggered_rule,
},
)
return FinalResponse(
content="",
overall_confidence=0.0,
success=False,
error_message=(
"Unsafe input detected. Request blocked due to potential prompt injection."
),
error_type="prompt_injection",
)
# ─────────────────────────────────────────────────────────────────

return await self._execute_with_timeout(user_input, execution_mode)

async def estimate_cost(self, user_input: str, execution_mode: ExecutionMode = ExecutionMode.BALANCED) -> Dict[str, Any]:
Expand Down
27 changes: 27 additions & 0 deletions ai_council/sanitization/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""
Sanitization Filter Layer for AI Council.

Provides prompt injection detection and blocking before prompt construction.

Public API:
SanitizationFilter – main entry point; chains multiple BaseFilter instances
BaseFilter – abstract base for all filter implementations
KeywordFilter – exact / substring keyword matching
RegexFilter – precompiled regex pattern matching
FilterResult – result dataclass returned by every filter
Severity – enum for LOW / MEDIUM / HIGH rule severity
"""

from .base import BaseFilter, FilterResult, Severity
from .keyword_filter import KeywordFilter
from .regex_filter import RegexFilter
from .sanitization_filter import SanitizationFilter

__all__ = [
"SanitizationFilter",
"BaseFilter",
"KeywordFilter",
"RegexFilter",
"FilterResult",
"Severity",
]
108 changes: 108 additions & 0 deletions ai_council/sanitization/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""Abstract base classes and shared data types for the sanitization layer."""

from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional


class Severity(str, Enum):
"""Severity level assigned to a matched rule."""

LOW = "low"
MEDIUM = "medium"
HIGH = "high"


@dataclass
class FilterResult:
"""Encapsulates the outcome of a single filter check.

Attributes:
is_safe: True when no threat was detected.
triggered_rule: Human-readable description of the rule that matched.
severity: Severity level of the detected threat.
matched_text: The portion of the input that triggered the rule.
filter_name: Name of the filter that produced this result.
"""

is_safe: bool = True
triggered_rule: Optional[str] = None
severity: Optional[Severity] = None
matched_text: Optional[str] = None
filter_name: str = ""

# Structured error payload returned to callers when the input is blocked.
@property
def error_response(self) -> dict:
"""Return a structured error dict when the input was blocked."""
if self.is_safe:
return {}
return {
"error": "Unsafe input detected. Request blocked due to potential prompt injection.",
"details": {
"filter": self.filter_name,
"rule": self.triggered_rule,
"severity": self.severity.value if self.severity else None,
Comment on lines +44 to +48
Copy link

Copilot AI Mar 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FilterResult.error_response includes the triggered_rule string in the user-facing payload. Because KeywordFilter/RegexFilter fall back to embedding the raw keyword/regex pattern in triggered_rule when a rule has no description, this can leak detection patterns to end users and help attackers iterate around the filter. Consider returning only a generic message (and maybe a non-sensitive rule id/category) to callers, while keeping full match details only in logs/telemetry.

Copilot uses AI. Check for mistakes.
},
}


@dataclass
class RuleDefinition:
"""A single configurable detection rule.

Attributes:
id: Unique identifier for the rule.
pattern: The keyword or regex pattern string.
severity: Severity when this rule fires.
enabled: Whether this rule is active.
description: Human-readable explanation of the rule.
"""

id: str
pattern: str
severity: Severity = Severity.HIGH
enabled: bool = True
description: str = ""


class BaseFilter(ABC):
"""Abstract base class that every filter must implement.

Subclasses should be lightweight; their :meth:`check` method is called
synchronously in the hot path and must complete in well under 5 ms for
typical inputs.
"""

def __init__(self, name: str, rules: List[RuleDefinition]):
self._name = name
self._rules: List[RuleDefinition] = [r for r in rules if r.enabled]

@property
def name(self) -> str:
return self._name

@abstractmethod
def check(self, text: str) -> FilterResult:
"""Inspect *text* and return a :class:`FilterResult`.

Args:
text: The raw user input to inspect.

Returns:
FilterResult with ``is_safe=True`` when no threat was detected.
"""

def add_rule(self, rule: RuleDefinition) -> None:
"""Dynamically add a rule at runtime."""
if rule.enabled:
self._rules.append(rule)

def disable_rule(self, rule_id: str) -> bool:
"""Disable a rule by its id. Returns True if the rule was found."""
before = len(self._rules)
self._rules = [r for r in self._rules if r.id != rule_id]
return len(self._rules) < before
Loading
Loading