feat: add Sanitization Filter Layer for prompt injection detection#231
feat: add Sanitization Filter Layer for prompt injection detection#231ishanraj953 wants to merge 1 commit intoshrixtacy:masterfrom
Conversation
- Add ai_council/sanitization/ package (KeywordFilter, RegexFilter, RateLimiter) - Add config/sanitization_filters.yaml with 18 keyword + 10 regex rules - Integrate filter as Stage 0 in main.py process_request() - Add 60+ unit tests in tests/test_sanitization.py - Add pipeline integration example in examples/sanitization_pipeline.py
🎉 Thanks for creating a PR!We'll review it as soon as possible. 📝 Pre-Review Checklist:
💬 Review Process:
Thank you for your contribution! 🙌🏼 |
📝 WalkthroughWalkthroughA new prompt-injection sanitization layer is added to the AI Council framework. It includes configurable keyword and regex filters, rate limiting, configuration loading from YAML, and integration into the main request processing pipeline as a pre-processing gate (Stage 0) before orchestration execution. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant AICouncil
participant SanitizationFilter
participant RateLimiter
participant KeywordFilter
participant RegexFilter
participant Orchestration
Client->>AICouncil: process_request(user_input, session_id)
AICouncil->>SanitizationFilter: check(user_input, source_key=session_id)
SanitizationFilter->>RateLimiter: is_rate_limited(session_id)
alt Rate Limited
RateLimiter-->>SanitizationFilter: true
SanitizationFilter->>RateLimiter: record_attempt(session_id)
SanitizationFilter-->>AICouncil: FilterResult(is_safe=False, error="rate_limit")
else Not Rate Limited
RateLimiter-->>SanitizationFilter: false
SanitizationFilter->>KeywordFilter: check(user_input)
alt Keyword Match Found
KeywordFilter-->>SanitizationFilter: FilterResult(is_safe=False)
SanitizationFilter->>RateLimiter: record_attempt(session_id)
SanitizationFilter-->>AICouncil: FilterResult(is_safe=False)
else No Keyword Match
KeywordFilter-->>SanitizationFilter: FilterResult(is_safe=True)
SanitizationFilter->>RegexFilter: check(user_input)
alt Regex Match Found
RegexFilter-->>SanitizationFilter: FilterResult(is_safe=False)
SanitizationFilter->>RateLimiter: record_attempt(session_id)
SanitizationFilter-->>AICouncil: FilterResult(is_safe=False)
else No Regex Match
RegexFilter-->>SanitizationFilter: FilterResult(is_safe=True)
SanitizationFilter-->>AICouncil: FilterResult(is_safe=True)
AICouncil->>Orchestration: _execute_with_timeout(user_input)
Orchestration-->>Client: FinalResponse(success=True)
end
end
end
alt Unsafe Input
AICouncil-->>Client: FinalResponse(success=False, error_type="prompt_injection")
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
✅ Issue Link StatusGreat job @ishanraj953! This PR is linked to the following issue(s): 📋 Reminder:
|
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (6)
ai_council/sanitization/regex_filter.py (1)
43-59: Consider moving logger import to module level.The
loggingimport andlogger = logging.getLogger(__name__)are repeated inside_compile_rulesandadd_rule. Move these to the module level for cleaner code.♻️ Proposed fix
from __future__ import annotations +import logging import re from typing import Dict, List from .base import BaseFilter, FilterResult, RuleDefinition, Severity +logger = logging.getLogger(__name__) + class RegexFilter(BaseFilter):Then remove the local imports from
_compile_rulesandadd_rule.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/sanitization/regex_filter.py` around lines 43 - 59, Move the logging setup to module level: add a module-level import "import logging" and create a single module-level logger via "logger = logging.getLogger(__name__)" and remove the local imports/assignments inside the _compile_rules method and the add_rule method; update references in _compile_rules and add_rule to use the module-level logger variable so you no longer re-import logging or re-create the logger in those functions.ai_council/sanitization/config_loader.py (1)
160-171: Silent fallthrough from YAML parse failure to JSON parser.If the YAML file has valid syntax but
yaml.safe_loadraises an exception (notImportError), the code falls through tojson.loads(raw)which will likely fail with a confusing error. Consider catching YAML parse errors explicitly.♻️ Proposed fix
if path.suffix in (".yaml", ".yml"): try: import yaml # type: ignore return yaml.safe_load(raw) or {} except ImportError: logger.warning("PyYAML not installed; falling back to JSON parser for %s", path) + except yaml.YAMLError as exc: + raise ValueError(f"Invalid YAML in {path}: {exc}") from exc return json.loads(raw)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/sanitization/config_loader.py` around lines 160 - 171, The function _load_yaml_or_json currently falls through to json.loads when yaml.safe_load raises any exception, producing confusing errors; update the YAML branch to catch YAML-specific parsing errors (e.g., yaml.YAMLError or the parser's exception type) separate from ImportError: if yaml import fails keep the current warning and fall back to JSON, but if yaml.safe_load raises a YAML parsing exception, log or raise a clear error (including the original exception and the path) instead of attempting json.loads; reference the _load_yaml_or_json function and the calls to yaml.safe_load and json.loads when applying the change.config/sanitization_filters.yaml (1)
185-189: Base64 detection rule may produce false positives.The
rx-010pattern matches any occurrence ofbase64followed by 20+ base64 characters. This could flag legitimate use cases where users discuss base64 encoding or share encoded data that isn't malicious.Consider adding a more specific trigger (e.g., requiring injection-related context) or documenting this as a potential source of false positives that operators may want to disable.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@config/sanitization_filters.yaml` around lines 185 - 189, The rx-010 base64 detection is too broad and causes false positives; narrow its scope by updating the pattern for id rx-010 to require nearby injection/malicious context (e.g., require words like "payload|exec|eval|cmd|shell|inject|malicious" in the same match or within a contextual lookbehind/lookahead) or make the rule explicitly optional/configurable (lower severity or add a disable-able flag) and add a comment documenting that operators may want to disable it for benign discussions of base64; locate the rule by id rx-010 and modify the pattern or metadata accordingly so it only alerts when base64 data appears with likely exploit-related keywords.ai_council/sanitization/__init__.py (1)
1-13: Replace en-dash characters with hyphen-minus in docstring.The docstring uses en-dash (
–, U+2013) instead of hyphen-minus (-, U+002D) which can cause issues with some documentation tools and linters.📝 Proposed fix
""" Sanitization Filter Layer for AI Council. Provides prompt injection detection and blocking before prompt construction. Public API: - SanitizationFilter – main entry point; chains multiple BaseFilter instances - BaseFilter – abstract base for all filter implementations - KeywordFilter – exact / substring keyword matching - RegexFilter – precompiled regex pattern matching - FilterResult – result dataclass returned by every filter - Severity – enum for LOW / MEDIUM / HIGH rule severity + SanitizationFilter - main entry point; chains multiple BaseFilter instances + BaseFilter - abstract base for all filter implementations + KeywordFilter - exact / substring keyword matching + RegexFilter - precompiled regex pattern matching + FilterResult - result dataclass returned by every filter + Severity - enum for LOW / MEDIUM / HIGH rule severity """🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/sanitization/__init__.py` around lines 1 - 13, The module docstring contains en-dash characters (U+2013) in the public API list items (e.g., after SanitizationFilter, BaseFilter, KeywordFilter, RegexFilter, FilterResult, Severity); replace each en-dash with a standard hyphen-minus (ASCII U+002D) so the entries read like "SanitizationFilter - main entry point" etc.; update the top-level docstring in the ai_council.sanitization __init__ (where the public API block is declared) to use hyphen-minus characters consistently throughout.ai_council/sanitization/rate_limiter.py (1)
40-73: Thread safety is not a concern for current usage, but code is not thread-safe.The
RateLimitTrackeris indeed not thread-safe—operations like_evict()with its while loop and deque modifications are not atomic. However, the codebase uses it only in async contexts (e.g.,examples/sanitization_pipeline.pyshows it inasyncio.gather()), where a single-threaded event loop ensures that synchronous methods execute without interruption. Thread safety becomes a concern only if the rate limiter is passed to thread pools (viarun_in_executoror similar). The docstring already notes this is "intentionally simple" and suggests swapping it for Redis in production, so the current design is acceptable.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/sanitization/rate_limiter.py` around lines 40 - 73, RateLimitTracker is not thread-safe; protect concurrent access to its internal state by adding a lock (e.g., threading.RLock) as an instance attribute and acquiring it around all operations that read or mutate _counters or call _WindowedCounter methods—specifically wrap the bodies of __init__ (when initializing _counters if necessary), record_attempt, is_rate_limited, attempt_count, and reset with the lock to ensure atomicity and prevent races when used from threads or executors.examples/sanitization_pipeline.py (1)
39-39: Replace EN DASH with ASCII hyphen in docstrings.These characters trigger Ruff
RUF002; use-to avoid ambiguous Unicode punctuation in source.Also applies to: 49-49, 87-87
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/sanitization_pipeline.py` at line 39, Replace the Unicode EN DASH (–) with an ASCII hyphen (-) in the docstrings that currently use "–", e.g., the string "Placeholder – in production this is your real PromptBuilder." and the other docstrings flagged (around lines with the same pattern at the other two occurrences); update those literal docstring texts to use "-" so Ruff RUF002 is not triggered while preserving the original wording.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ai_council/sanitization/config_loader.py`:
- Around line 143-157: _rule_from_dict currently assumes required keys and can
raise KeyError; add explicit validation at the top of _rule_from_dict to ensure
"id" and "pattern" exist and are the expected types (raise a ValueError with a
clear message if missing/invalid) so RuleDefinition(...) is only constructed
with valid data, and then update load_rules_from_config to iterate over rule
dicts (e.g., keyword_dicts) wrapping calls to _rule_from_dict in a try/except
that catches (ValueError, KeyError) and logs a warning like "Skipping malformed
rule: <error>" before continuing so malformed rules are skipped instead of
failing the entire load.
In `@ai_council/sanitization/sanitization_filter.py`:
- Around line 142-154: The early return in sanitization_filter.py skips
recording blocked attempts when a source is already rate-limited; before
returning the FilterResult in the branch that checks
self._rate_limiter.is_rate_limited(source_key), call
self._rate_limiter.record_attempt(source_key) (or the appropriate RateLimiter
method that increments the sliding window) so repeated blocked traffic extends
the window, then log and return the FilterResult (keep the existing
logger.warning, attempt_count call and FilterResult/RateLimiter identifiers).
- Around line 163-170: The log currently writes raw sensitive content via
result.matched_text in the SANITIZATION BLOCKED message; update the logging in
sanitization_filter.py (where the message is constructed) to omit
result.matched_text and instead include a non-sensitive surrogate such as a
redaction token or safe metadata (e.g., matched_length or a deterministic hash
like sha256 of result.matched_text) and ensure the format string and
accompanying arguments are updated accordingly; keep
severity/result.filter_name/result.triggered_rule/elapsed_ms unchanged but
remove any direct user content from the log entry.
In `@config/sanitization_filters.yaml`:
- Around line 191-197: The rate_limit block in the YAML is never used because
load_rules_from_config (ai_council/sanitization/config_loader.py) only returns
keyword_rules and regex_rules; fix by either removing the rate_limit section
from the YAML or (preferred) extend load_rules_from_config to parse and return
rate_limit settings (e.g., as a dict alongside keyword_rules and regex_rules),
update SanitizationFilter.from_config to accept and apply that rate_limit dict,
and propagate the new return shape to all callers (adjusting any places that
call load_rules_from_config and SanitizationFilter.from_config to pass along the
rate-limit config).
In `@examples/sanitization_pipeline.py`:
- Line 159: The print statement uses an unnecessary f-string with no
interpolation; locate the print call printing " Outcome : BLOCKED" (print(f"
Outcome : BLOCKED")) and remove the leading f so it reads print(" Outcome :
BLOCKED"), update any similar print statements in the same function (e.g., the
surrounding outcome/logging prints) to avoid redundant f-strings to satisfy lint
rules.
In `@tests/test_sanitization.py`:
- Line 526: The loop variable `i` in the loop `for i in range(4):` in
tests/test_sanitization.py is unused and should be renamed to `_` to satisfy
Ruff B007 and PEP8; update the loop header to `for _ in range(4):` wherever that
loop appears (keeping the loop body unchanged) so the intent of an unused
iteration variable is clear.
---
Nitpick comments:
In `@ai_council/sanitization/__init__.py`:
- Around line 1-13: The module docstring contains en-dash characters (U+2013) in
the public API list items (e.g., after SanitizationFilter, BaseFilter,
KeywordFilter, RegexFilter, FilterResult, Severity); replace each en-dash with a
standard hyphen-minus (ASCII U+002D) so the entries read like
"SanitizationFilter - main entry point" etc.; update the top-level docstring in
the ai_council.sanitization __init__ (where the public API block is declared) to
use hyphen-minus characters consistently throughout.
In `@ai_council/sanitization/config_loader.py`:
- Around line 160-171: The function _load_yaml_or_json currently falls through
to json.loads when yaml.safe_load raises any exception, producing confusing
errors; update the YAML branch to catch YAML-specific parsing errors (e.g.,
yaml.YAMLError or the parser's exception type) separate from ImportError: if
yaml import fails keep the current warning and fall back to JSON, but if
yaml.safe_load raises a YAML parsing exception, log or raise a clear error
(including the original exception and the path) instead of attempting
json.loads; reference the _load_yaml_or_json function and the calls to
yaml.safe_load and json.loads when applying the change.
In `@ai_council/sanitization/rate_limiter.py`:
- Around line 40-73: RateLimitTracker is not thread-safe; protect concurrent
access to its internal state by adding a lock (e.g., threading.RLock) as an
instance attribute and acquiring it around all operations that read or mutate
_counters or call _WindowedCounter methods—specifically wrap the bodies of
__init__ (when initializing _counters if necessary), record_attempt,
is_rate_limited, attempt_count, and reset with the lock to ensure atomicity and
prevent races when used from threads or executors.
In `@ai_council/sanitization/regex_filter.py`:
- Around line 43-59: Move the logging setup to module level: add a module-level
import "import logging" and create a single module-level logger via "logger =
logging.getLogger(__name__)" and remove the local imports/assignments inside the
_compile_rules method and the add_rule method; update references in
_compile_rules and add_rule to use the module-level logger variable so you no
longer re-import logging or re-create the logger in those functions.
In `@config/sanitization_filters.yaml`:
- Around line 185-189: The rx-010 base64 detection is too broad and causes false
positives; narrow its scope by updating the pattern for id rx-010 to require
nearby injection/malicious context (e.g., require words like
"payload|exec|eval|cmd|shell|inject|malicious" in the same match or within a
contextual lookbehind/lookahead) or make the rule explicitly
optional/configurable (lower severity or add a disable-able flag) and add a
comment documenting that operators may want to disable it for benign discussions
of base64; locate the rule by id rx-010 and modify the pattern or metadata
accordingly so it only alerts when base64 data appears with likely
exploit-related keywords.
In `@examples/sanitization_pipeline.py`:
- Line 39: Replace the Unicode EN DASH (–) with an ASCII hyphen (-) in the
docstrings that currently use "–", e.g., the string "Placeholder – in production
this is your real PromptBuilder." and the other docstrings flagged (around lines
with the same pattern at the other two occurrences); update those literal
docstring texts to use "-" so Ruff RUF002 is not triggered while preserving the
original wording.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f17888ed-bcad-4101-9abe-18abc3dae05d
📒 Files selected for processing (11)
ai_council/main.pyai_council/sanitization/__init__.pyai_council/sanitization/base.pyai_council/sanitization/config_loader.pyai_council/sanitization/keyword_filter.pyai_council/sanitization/rate_limiter.pyai_council/sanitization/regex_filter.pyai_council/sanitization/sanitization_filter.pyconfig/sanitization_filters.yamlexamples/sanitization_pipeline.pytests/test_sanitization.py
| def _rule_from_dict(data: Dict[str, Any]) -> RuleDefinition: | ||
| severity_raw = data.get("severity", "high").lower() | ||
| try: | ||
| severity = Severity(severity_raw) | ||
| except ValueError: | ||
| severity = Severity.HIGH | ||
| logger.warning("Unknown severity '%s' for rule '%s'; defaulting to HIGH.", severity_raw, data.get("id")) | ||
|
|
||
| return RuleDefinition( | ||
| id=data["id"], | ||
| pattern=data["pattern"], | ||
| severity=severity, | ||
| enabled=data.get("enabled", True), | ||
| description=data.get("description", ""), | ||
| ) |
There was a problem hiding this comment.
Missing validation for required rule fields could cause KeyError.
If a rule dict in the YAML config is missing id or pattern, _rule_from_dict will raise a KeyError. This would cause the entire config loading to fail (caught at line 203-205), falling back to defaults. Consider explicit validation with a more informative error message, or wrapping individual rule conversions to skip malformed rules gracefully.
🛡️ Proposed fix to skip malformed rules
def _rule_from_dict(data: Dict[str, Any]) -> RuleDefinition:
+ if "id" not in data or "pattern" not in data:
+ raise ValueError(f"Rule missing required 'id' or 'pattern': {data}")
+
severity_raw = data.get("severity", "high").lower()Then in load_rules_from_config, wrap conversions:
keyword_rules = []
for d in keyword_dicts:
try:
keyword_rules.append(_rule_from_dict(d))
except (KeyError, ValueError) as exc:
logger.warning("Skipping malformed keyword rule: %s", exc)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ai_council/sanitization/config_loader.py` around lines 143 - 157,
_rule_from_dict currently assumes required keys and can raise KeyError; add
explicit validation at the top of _rule_from_dict to ensure "id" and "pattern"
exist and are the expected types (raise a ValueError with a clear message if
missing/invalid) so RuleDefinition(...) is only constructed with valid data, and
then update load_rules_from_config to iterate over rule dicts (e.g.,
keyword_dicts) wrapping calls to _rule_from_dict in a try/except that catches
(ValueError, KeyError) and logs a warning like "Skipping malformed rule:
<error>" before continuing so malformed rules are skipped instead of failing the
entire load.
| if self._rate_limiter and self._rate_limiter.is_rate_limited(source_key): | ||
| logger.warning( | ||
| "[SANITIZATION] source_key='%s' is rate-limited (%d attempts in window).", | ||
| source_key, | ||
| self._rate_limiter.attempt_count(source_key), | ||
| ) | ||
| return FilterResult( | ||
| is_safe=False, | ||
| triggered_rule="Rate limit exceeded — too many blocked requests", | ||
| severity=Severity.HIGH, | ||
| matched_text=None, | ||
| filter_name="RateLimiter", | ||
| ) |
There was a problem hiding this comment.
Record blocked attempts even when already rate-limited.
Current early return skips record_attempt(), so repeated blocked traffic while throttled does not extend the sliding window.
Suggested fix
if self._rate_limiter and self._rate_limiter.is_rate_limited(source_key):
+ self._rate_limiter.record_attempt(source_key)
logger.warning(
"[SANITIZATION] source_key='%s' is rate-limited (%d attempts in window).",
source_key,
self._rate_limiter.attempt_count(source_key),
)
return FilterResult(📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if self._rate_limiter and self._rate_limiter.is_rate_limited(source_key): | |
| logger.warning( | |
| "[SANITIZATION] source_key='%s' is rate-limited (%d attempts in window).", | |
| source_key, | |
| self._rate_limiter.attempt_count(source_key), | |
| ) | |
| return FilterResult( | |
| is_safe=False, | |
| triggered_rule="Rate limit exceeded — too many blocked requests", | |
| severity=Severity.HIGH, | |
| matched_text=None, | |
| filter_name="RateLimiter", | |
| ) | |
| if self._rate_limiter and self._rate_limiter.is_rate_limited(source_key): | |
| self._rate_limiter.record_attempt(source_key) | |
| logger.warning( | |
| "[SANITIZATION] source_key='%s' is rate-limited (%d attempts in window).", | |
| source_key, | |
| self._rate_limiter.attempt_count(source_key), | |
| ) | |
| return FilterResult( | |
| is_safe=False, | |
| triggered_rule="Rate limit exceeded — too many blocked requests", | |
| severity=Severity.HIGH, | |
| matched_text=None, | |
| filter_name="RateLimiter", | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ai_council/sanitization/sanitization_filter.py` around lines 142 - 154, The
early return in sanitization_filter.py skips recording blocked attempts when a
source is already rate-limited; before returning the FilterResult in the branch
that checks self._rate_limiter.is_rate_limited(source_key), call
self._rate_limiter.record_attempt(source_key) (or the appropriate RateLimiter
method that increments the sliding window) so repeated blocked traffic extends
the window, then log and return the FilterResult (keep the existing
logger.warning, attempt_count call and FilterResult/RateLimiter identifiers).
| "[SANITIZATION BLOCKED] source_key='%s' filter='%s' rule='%s' " | ||
| "severity='%s' matched='%s' elapsed=%.3fms", | ||
| source_key, | ||
| result.filter_name, | ||
| result.triggered_rule, | ||
| result.severity.value if result.severity else "n/a", | ||
| result.matched_text, | ||
| elapsed_ms, |
There was a problem hiding this comment.
Do not log raw matched user text in security logs.
result.matched_text may contain sensitive user content. Logging it directly is a privacy/compliance risk.
Suggested fix
- "[SANITIZATION BLOCKED] source_key='%s' filter='%s' rule='%s' "
- "severity='%s' matched='%s' elapsed=%.3fms",
+ "[SANITIZATION BLOCKED] source_key='%s' filter='%s' rule='%s' "
+ "severity='%s' match_present=%s elapsed=%.3fms",
source_key,
result.filter_name,
result.triggered_rule,
result.severity.value if result.severity else "n/a",
- result.matched_text,
+ bool(result.matched_text),
elapsed_ms,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "[SANITIZATION BLOCKED] source_key='%s' filter='%s' rule='%s' " | |
| "severity='%s' matched='%s' elapsed=%.3fms", | |
| source_key, | |
| result.filter_name, | |
| result.triggered_rule, | |
| result.severity.value if result.severity else "n/a", | |
| result.matched_text, | |
| elapsed_ms, | |
| "[SANITIZATION BLOCKED] source_key='%s' filter='%s' rule='%s' " | |
| "severity='%s' match_present=%s elapsed=%.3fms", | |
| source_key, | |
| result.filter_name, | |
| result.triggered_rule, | |
| result.severity.value if result.severity else "n/a", | |
| bool(result.matched_text), | |
| elapsed_ms, |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ai_council/sanitization/sanitization_filter.py` around lines 163 - 170, The
log currently writes raw sensitive content via result.matched_text in the
SANITIZATION BLOCKED message; update the logging in sanitization_filter.py
(where the message is constructed) to omit result.matched_text and instead
include a non-sensitive surrogate such as a redaction token or safe metadata
(e.g., matched_length or a deterministic hash like sha256 of
result.matched_text) and ensure the format string and accompanying arguments are
updated accordingly; keep
severity/result.filter_name/result.triggered_rule/elapsed_ms unchanged but
remove any direct user content from the log entry.
| # ---------------------------------------------------------- | ||
| # Rate-limit defaults (can be overridden programmatically) | ||
| # ---------------------------------------------------------- | ||
| rate_limit: | ||
| enabled: true | ||
| max_attempts: 5 # blocked attempts before throttle flag | ||
| window_seconds: 60 # sliding window in seconds |
There was a problem hiding this comment.
The rate_limit configuration is not loaded by the code.
Per ai_council/sanitization/config_loader.py (lines 209-210), load_rules_from_config only extracts keyword_rules and regex_rules. The rate_limit section is completely ignored. Users editing these values will see no effect.
Either:
- Remove this section from the YAML (since it's misleading), or
- Extend
load_rules_from_configto return rate-limit settings and wire them throughSanitizationFilter.from_config
Option 1: Remove misleading config (quick fix)
- # ----------------------------------------------------------
- # Rate-limit defaults (can be overridden programmatically)
- # ----------------------------------------------------------
- rate_limit:
- enabled: true
- max_attempts: 5 # blocked attempts before throttle flag
- window_seconds: 60 # sliding window in seconds📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # ---------------------------------------------------------- | |
| # Rate-limit defaults (can be overridden programmatically) | |
| # ---------------------------------------------------------- | |
| rate_limit: | |
| enabled: true | |
| max_attempts: 5 # blocked attempts before throttle flag | |
| window_seconds: 60 # sliding window in seconds |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@config/sanitization_filters.yaml` around lines 191 - 197, The rate_limit
block in the YAML is never used because load_rules_from_config
(ai_council/sanitization/config_loader.py) only returns keyword_rules and
regex_rules; fix by either removing the rate_limit section from the YAML or
(preferred) extend load_rules_from_config to parse and return rate_limit
settings (e.g., as a dict alongside keyword_rules and regex_rules), update
SanitizationFilter.from_config to accept and apply that rate_limit dict, and
propagate the new return shape to all callers (adjusting any places that call
load_rules_from_config and SanitizationFilter.from_config to pass along the
rate-limit config).
| print(f" Input : {user_input!r}") | ||
| result = await pipeline.process(user_input, session_id=session) | ||
| if "error" in result: | ||
| print(f" Outcome : BLOCKED") |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
rg -n 'print\(f" Outcome : BLOCKED"\)' examples/sanitization_pipeline.pyRepository: shrixtacy/Ai-Council
Length of output: 109
Remove the redundant f-string prefix.
Line 159 uses an f-string with no variable interpolation, which is unnecessary and fails lint checks.
Suggested fix
- print(f" Outcome : BLOCKED")
+ print(" Outcome : BLOCKED")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| print(f" Outcome : BLOCKED") | |
| print(" Outcome : BLOCKED") |
🧰 Tools
🪛 Ruff (0.15.7)
[error] 159-159: f-string without any placeholders
Remove extraneous f prefix
(F541)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@examples/sanitization_pipeline.py` at line 159, The print statement uses an
unnecessary f-string with no interpolation; locate the print call printing "
Outcome : BLOCKED" (print(f" Outcome : BLOCKED")) and remove the leading f so
it reads print(" Outcome : BLOCKED"), update any similar print statements in
the same function (e.g., the surrounding outcome/logging prints) to avoid
redundant f-strings to satisfy lint rules.
|
|
||
| def test_attempt_count(self): | ||
| tracker = RateLimitTracker(max_attempts=10, window_seconds=60) | ||
| for i in range(4): |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
rg -n 'for i in range\(4\):' tests/test_sanitization.pyRepository: shrixtacy/Ai-Council
Length of output: 94
🏁 Script executed:
sed -n '520,535p' tests/test_sanitization.pyRepository: shrixtacy/Ai-Council
Length of output: 626
🌐 Web query:
Ruff B007 rule unused loop variable
💡 Result:
Ruff's B007 rule, named "unused-loop-control-variable", detects unused loop control variables in for and while loops. It flags cases where a variable defined in the loop header (e.g., 'i' in 'for i, j in foo') is not used within the loop body, as this can confuse readers. Why it matters: Unused variables should be prefixed with an underscore (e.g., '_j') to signal intent, or removed if unnecessary. This follows PEP 8 naming conventions. Example violation: for i, j in foo: bar(i) # Flags B007 on 'j' Fixed: for i, j in foo: bar(i) The rule is derived from flake8-bugbear, added in Ruff v0.0.84, and is fixable in some cases. It checks usage only within the loop body, not outside (see open issues #10723, #14113 for nuances like external use or shadowing). Configuration: Controlled by lint.dummy-variable-rgx (e.g., to recognize '' as dummy). Enable via select=["B"] or specifically "B007".
Citations:
- 1: https://docs.astral.sh/ruff/rules/unused-loop-control-variable/
- 2: B007 guidance when variable used outside the loop only astral-sh/ruff#10723
- 3: https://docs.astral.sh/ruff/rules/
- 4: B007 Loop control variable '_' not used within loop body astral-sh/ruff#13921
- 5: B007 does not detect unused variables when they are shadowed or appear in inner scope astral-sh/ruff#14113
Rename unused loop variable to _.
i is unused and triggers Ruff B007. Per PEP 8 conventions, unused loop variables should be named _.
Suggested fix
- for i in range(4):
+ for _ in range(4):
tracker.record_attempt("u")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for i in range(4): | |
| for _ in range(4): | |
| tracker.record_attempt("u") |
🧰 Tools
🪛 Ruff (0.15.7)
[warning] 526-526: Loop control variable i not used within loop body
Rename unused i to _i
(B007)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_sanitization.py` at line 526, The loop variable `i` in the loop
`for i in range(4):` in tests/test_sanitization.py is unused and should be
renamed to `_` to satisfy Ruff B007 and PEP8; update the loop header to `for _
in range(4):` wherever that loop appears (keeping the loop body unchanged) so
the intent of an unused iteration variable is clear.
There was a problem hiding this comment.
Pull request overview
Adds a new sanitization layer to the AI Council pipeline to detect likely prompt-injection inputs before prompt construction, with configurable keyword/regex rules and optional rate-limiting.
Changes:
- Introduces
ai_council.sanitizationpackage (keyword filter, regex filter, config loader, rate-limit tracker, and chainedSanitizationFilter). - Adds default rule config at
config/sanitization_filters.yamlplus an end-to-end integration example. - Integrates sanitization as “Stage 0” in
AICouncil.process_request()and adds a comprehensive test suite.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_sanitization.py | New unit/integration tests for keyword/regex filters, chaining, config fallback, and rate limiting. |
| examples/sanitization_pipeline.py | Demonstrates how to wire SanitizationFilter before prompt building/execution. |
| config/sanitization_filters.yaml | Default keyword/regex rules plus documented rate-limit defaults. |
| ai_council/sanitization/init.py | Exposes sanitization public API. |
| ai_council/sanitization/base.py | Defines shared dataclasses/enums and structured error payload generation. |
| ai_council/sanitization/config_loader.py | Loads rules from YAML/JSON and defines built-in fallback rules. |
| ai_council/sanitization/keyword_filter.py | Implements substring-based keyword matching filter. |
| ai_council/sanitization/regex_filter.py | Implements precompiled regex matching filter with invalid-pattern skipping. |
| ai_council/sanitization/rate_limiter.py | Sliding-window blocked-attempt tracker used for throttling. |
| ai_council/sanitization/sanitization_filter.py | Chained filter orchestration + factory construction from config + rate-limit gate. |
| ai_council/main.py | Runs sanitization before orchestration and returns a failure FinalResponse when blocked. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return RuleDefinition( | ||
| id=data["id"], | ||
| pattern=data["pattern"], | ||
| severity=severity, |
There was a problem hiding this comment.
_rule_from_dict() indexes required fields with data["id"] and data["pattern"], so a single malformed rule in a user-supplied config will raise KeyError and crash filter initialization (load_rules_from_config doesn’t catch this stage). Consider validating required fields and either skipping invalid entries with a warning or treating the entire config as invalid and falling back to defaults.
| self._counters: Dict[str, _WindowedCounter] = defaultdict( | ||
| lambda: _WindowedCounter(window_seconds=self._window_seconds) | ||
| ) |
There was a problem hiding this comment.
Using defaultdict here means that any read access like self._counters[key] will create a new counter entry. With many unique session/user IDs this can grow _counters unbounded over time even when there are no blocked attempts. Prefer a normal dict and only create counters in record_attempt().
| "error": "Unsafe input detected. Request blocked due to potential prompt injection.", | ||
| "details": { | ||
| "filter": self.filter_name, | ||
| "rule": self.triggered_rule, | ||
| "severity": self.severity.value if self.severity else None, |
There was a problem hiding this comment.
FilterResult.error_response includes the triggered_rule string in the user-facing payload. Because KeywordFilter/RegexFilter fall back to embedding the raw keyword/regex pattern in triggered_rule when a rule has no description, this can leak detection patterns to end users and help attackers iterate around the filter. Consider returning only a generic message (and maybe a non-sensitive rule id/category) to callers, while keeping full match details only in logs/telemetry.
| "id": "kw-009", | ||
| "pattern": "you are now", | ||
| "severity": "medium", | ||
| "description": "Role reassignment (medium – may be benign)", | ||
| }, | ||
| { |
There was a problem hiding this comment.
The built-in fallback keyword rules enable the very broad pattern "you are now". If the external YAML isn’t present (common in installed deployments unless explicitly packaged), this becomes the default behavior and may cause many false positives on benign prompts. Recommend disabling/removing this rule in built-ins (or matching the YAML where it’s disabled by default).
| "id": "kw-009", | |
| "pattern": "you are now", | |
| "severity": "medium", | |
| "description": "Role reassignment (medium – may be benign)", | |
| }, | |
| { |
| import logging | ||
| import time | ||
| from pathlib import Path | ||
| from typing import List, Optional |
There was a problem hiding this comment.
Unused import: Optional is imported but not referenced in this module. Consider removing it to keep imports clean.
| from typing import List, Optional | |
| from typing import List |
| return self._counters[key].count() >= self._max_attempts | ||
|
|
||
| def attempt_count(self, key: str) -> int: | ||
| """Return the current number of attempts within the window for *key*.""" | ||
| return self._counters[key].count() |
There was a problem hiding this comment.
is_rate_limited() currently reads via self._counters[key], which (with defaultdict) creates state for every key queried. Consider using self._counters.get(key) and returning False when absent to avoid mutating state on reads.
| return self._counters[key].count() >= self._max_attempts | |
| def attempt_count(self, key: str) -> int: | |
| """Return the current number of attempts within the window for *key*.""" | |
| return self._counters[key].count() | |
| counter = self._counters.get(key) | |
| if counter is None: | |
| return False | |
| return counter.count() >= self._max_attempts | |
| def attempt_count(self, key: str) -> int: | |
| """Return the current number of attempts within the window for *key*.""" | |
| counter = self._counters.get(key) | |
| if counter is None: | |
| return 0 | |
| return counter.count() |
| # Default path relative to the *repository root* (resolved at runtime) | ||
| _DEFAULT_CONFIG: Path = Path(__file__).parents[2] / "config" / "sanitization_filters.yaml" | ||
|
|
There was a problem hiding this comment.
The default config path is computed as a repository-root relative path (Path(file).parents[2]/config/...), which likely won’t exist in an installed wheel/site-packages environment. In that case the filter will silently fall back to built-in defaults, meaning production behavior differs from local/dev behavior. Consider packaging the YAML inside the Python package and loading it via importlib.resources (or similar).
| sanitization_cfg = data.get("sanitization", data) # support nested or flat files | ||
|
|
||
| keyword_dicts: List[Dict] = sanitization_cfg.get("keyword_rules", []) | ||
| regex_dicts: List[Dict] = sanitization_cfg.get("regex_rules", []) | ||
|
|
There was a problem hiding this comment.
The config format (config/sanitization_filters.yaml) defines a sanitization.rate_limit section, but load_rules_from_config() only reads keyword_rules/regex_rules and drops any rate-limit settings. Either parse and return rate-limit settings too (so the YAML actually controls defaults), or remove the unused section to avoid a misleading configuration surface.
| """ | ||
| resolved = config_path or _DEFAULT_CONFIG | ||
| keyword_rules, regex_rules = load_rules_from_config(resolved) | ||
|
|
There was a problem hiding this comment.
from_config() treats config_path=None as “use the default repo-relative YAML file” (not “use embedded defaults”), which can be surprising and makes it hard to force built-in defaults when the file exists. Consider changing the API so None means embedded defaults, and provide an explicit sentinel/path for the external YAML; this also avoids differing behavior between source tree vs installed package.
Pull Request
Description
This PR introduces a Sanitization Filter Layer to detect and block prompt injection attempts before prompt construction in the AI Council pipeline. The filter is integrated as Stage 0 in the request lifecycle to ensure unsafe inputs are prevented from reaching downstream components.
Type of Change
Related Issues
Fixes #190
Changes Made
ai_council/sanitization/package:KeywordFilterfor fast substring-based detectionRegexFilterfor precompiled regex-based detectionRateLimiterfor sliding-window request limitingconfig/sanitization_filters.yaml:main.pywithinprocess_request()tests/test_sanitization.pycovering:examples/sanitization_pipeline.pyTesting
Testing Details
Describe the tests you ran to verify your changes: