Feat/cost optimized query pipeline#232
Conversation
- Add ai_council/sanitization/ package (KeywordFilter, RegexFilter, RateLimiter) - Add config/sanitization_filters.yaml with 18 keyword + 10 regex rules - Integrate filter as Stage 0 in main.py process_request() - Add 60+ unit tests in tests/test_sanitization.py - Add pipeline integration example in examples/sanitization_pipeline.py
- EmbeddingEngine: hash-based 384-dim embeddings, LRU cache, zero deps - VectorStore: numpy brute-force search + FAISS adapter, 8 seeded topics - TopicClassifier: weighted top-k NN vote, <1ms avg, 100% accuracy on test suite - SmartQueryDecomposer: comma/bullet/numbered splits, dependency graph, toposort - ModelRouter: cheap/mid/expensive tiers with topic adjustment + cost savings delta - TokenOptimizer: RAG cherry-pick + boilerplate compression + hard trim - QueryCache: two-level LRU + diskcache, sha256 key, TTL, hit/miss stats - QueryPipeline: 10-stage async orchestrator with CostReport + observability - config/query_pipeline.yaml: production YAML config - tests/test_query_pipeline.py: 80+ unit tests covering all components - tmp/validate_query_pipeline.py: standalone validator 88/88 passing
🎉 Thanks for creating a PR!We'll review it as soon as possible. 📝 Pre-Review Checklist:
💬 Review Process:
Thank you for your contribution! 🙌🏼 |
📝 WalkthroughWalkthroughThis pull request introduces a cost-optimized query processing pipeline with topic classification, multi-part query decomposition, and intelligent model routing to reduce costs, alongside a sanitization/prompt-injection filtering layer integrated into the main AICouncil class. Both subsystems are configurable, extensively tested, and demonstrated via example scripts. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant AICouncil
participant SanitizationFilter
participant QueryPipeline
participant EmbeddingEngine
participant VectorStore
participant TopicClassifier
participant QueryDecomposer
participant ModelRouter
participant TokenOptimizer
participant QueryCache
User->>AICouncil: process_request(user_input, session_id)
AICouncil->>SanitizationFilter: check(user_input, source_key=session_id)
alt Input is unsafe
SanitizationFilter-->>AICouncil: FilterResult(is_safe=False)
AICouncil-->>User: FinalResponse(success=False, error_type=prompt_injection)
else Input is safe
SanitizationFilter-->>AICouncil: FilterResult(is_safe=True)
AICouncil->>QueryPipeline: process(user_input)
QueryPipeline->>QueryCache: lookup(user_input)
alt Cache hit
QueryCache-->>QueryPipeline: cached_result
QueryPipeline-->>AICouncil: PipelineResult(from_cache=True)
else Cache miss
QueryCache-->>QueryPipeline: None
QueryPipeline->>EmbeddingEngine: embed(user_input)
EmbeddingEngine-->>QueryPipeline: embedding_vector
QueryPipeline->>VectorStore: search_topk(embedding_vector)
VectorStore-->>QueryPipeline: SearchResult[]
QueryPipeline->>TopicClassifier: classify(user_input)
TopicClassifier-->>QueryPipeline: ClassificationResult
QueryPipeline->>QueryDecomposer: decompose(user_input, topic_hint)
QueryDecomposer-->>QueryPipeline: DecompositionResult(sub_queries)
QueryPipeline->>ModelRouter: route_all(sub_queries)
ModelRouter-->>QueryPipeline: RouterResult(decisions, cost_report)
loop For each sub-query
QueryPipeline->>TokenOptimizer: optimize(sub_query, context, budget)
TokenOptimizer-->>QueryPipeline: OptimizedPrompt
end
QueryPipeline->>QueryCache: store(user_input, result)
QueryCache-->>QueryPipeline: (stored)
QueryPipeline-->>AICouncil: PipelineResult(cost_report, decisions)
end
AICouncil-->>User: FinalResponse(success=True, content, cost_info)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
✅ Issue Link StatusGreat job @ishanraj953! This PR is linked to the following issue(s): 📋 Reminder:
|
There was a problem hiding this comment.
Pull request overview
Adds a new cost-optimized query processing subsystem (ai_council/query_pipeline) and introduces a configurable prompt-injection sanitization layer (ai_council/sanitization) that is wired into AICouncil.process_request.
Changes:
- Introduces a standalone query pipeline with embeddings, vector search, topic classification, query decomposition, model routing, token optimization, and caching.
- Adds a sanitization filter layer (keyword + regex + rate limiting) with YAML-configured rules, plus examples and tests.
- Extends the main application entrypoint to run sanitization before orchestration.
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| tmp/validate_query_pipeline.py | Standalone validation script for exercising the query pipeline components end-to-end. |
| tests/test_sanitization.py | Unit tests for sanitization filters, rule loading, and rate-limit behavior. |
| tests/test_query_pipeline.py | Unit tests covering the query pipeline components and end-to-end orchestration. |
| examples/sanitization_pipeline.py | Demonstrates integrating SanitizationFilter before prompt construction/execution. |
| examples/query_pipeline_demo.py | Demonstrates each stage of the cost-optimized query pipeline. |
| config/sanitization_filters.yaml | Default keyword/regex rules and rate-limit defaults for prompt-injection detection. |
| config/query_pipeline.yaml | Default configuration for embedding/vector store/routing tiers/cache/pipeline limits. |
| ai_council/sanitization/sanitization_filter.py | Implements chained sanitization + optional rate limiting and config-based construction. |
| ai_council/sanitization/regex_filter.py | Regex-based filter with precompiled patterns and invalid-regex skipping. |
| ai_council/sanitization/rate_limiter.py | In-memory sliding-window rate-limit tracker for repeated blocked attempts. |
| ai_council/sanitization/keyword_filter.py | Case-insensitive substring keyword filter. |
| ai_council/sanitization/config_loader.py | Loads sanitization rules from YAML/JSON with built-in fallback rules. |
| ai_council/sanitization/base.py | Base filter abstractions, rule/result dataclasses, and structured error payload. |
| ai_council/sanitization/init.py | Exposes sanitization public API surface. |
| ai_council/query_pipeline/vector_store.py | In-memory vector store (NumPy; optional FAISS) with seeded topic exemplars. |
| ai_council/query_pipeline/topic_classifier.py | Weighted top-k voting classifier returning topic + confidence + context. |
| ai_council/query_pipeline/token_optimizer.py | Token optimization (RAG cherry-pick, compression, hard trimming). |
| ai_council/query_pipeline/query_decomposer.py | Decomposes queries into scored sub-queries with dependency ordering. |
| ai_council/query_pipeline/pipeline.py | Orchestrates pipeline stages, execution waves, aggregation, and cost reporting. |
| ai_council/query_pipeline/model_router.py | Routes sub-queries to model tiers and computes cost deltas. |
| ai_council/query_pipeline/embeddings.py | Hash-based embedding backend with optional sentence-transformers backend and LRU cache. |
| ai_council/query_pipeline/config.py | YAML/JSON config loader for query pipeline settings. |
| ai_council/query_pipeline/cache.py | Two-level (memory + optional diskcache) query result cache with TTL and stats. |
| ai_council/query_pipeline/init.py | Exposes query pipeline public API surface. |
| ai_council/main.py | Wires sanitization into AICouncil.process_request and adds session_id for rate limiting. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| latency.optimization_ms = sum( | ||
| sr.optimized_prompt.optimized_tokens / 1_000 for sr in sub_results | ||
| ) | ||
| latency.execution_ms = (time.perf_counter() - t0) * 1_000 |
There was a problem hiding this comment.
latency.optimization_ms is currently derived from token counts (optimized_tokens / 1000) rather than elapsed time, so it is not a latency metric and can be misleading in observability. Measure optimization time with perf_counter() around the optimizer calls (or accumulate SubQueryResult timing fields) and store milliseconds here.
| latency.optimization_ms = sum( | |
| sr.optimized_prompt.optimized_tokens / 1_000 for sr in sub_results | |
| ) | |
| latency.execution_ms = (time.perf_counter() - t0) * 1_000 | |
| segment_ms = (time.perf_counter() - t0) * 1_000 | |
| # NOTE: We no longer derive optimization latency from token counts, as this | |
| # is misleading for a "*_ms" metric. If precise optimization timing is | |
| # needed, it should be measured around the optimizer calls themselves. | |
| latency.optimization_ms = 0.0 | |
| latency.execution_ms = segment_ms |
| return self._counters[key].count() >= self._max_attempts | ||
|
|
||
| def attempt_count(self, key: str) -> int: | ||
| """Return the current number of attempts within the window for *key*.""" | ||
| return self._counters[key].count() |
There was a problem hiding this comment.
RateLimitTracker.is_rate_limited() and .attempt_count() access self._counters[key], which creates a new _WindowedCounter for every previously unseen key (even for safe traffic). This can lead to unbounded memory growth with high-cardinality source_key values. Consider using self._counters.get(key) and treating missing keys as zero-attempts without inserting into the dict.
| return self._counters[key].count() >= self._max_attempts | |
| def attempt_count(self, key: str) -> int: | |
| """Return the current number of attempts within the window for *key*.""" | |
| return self._counters[key].count() | |
| counter = self._counters.get(key) | |
| if counter is None: | |
| # No recorded attempts for this key; treat as not rate-limited. | |
| return False | |
| return counter.count() >= self._max_attempts | |
| def attempt_count(self, key: str) -> int: | |
| """Return the current number of attempts within the window for *key*.""" | |
| counter = self._counters.get(key) | |
| if counter is None: | |
| # No recorded attempts for this key; treat as zero attempts. | |
| return 0 | |
| return counter.count() |
|
|
||
| from ai_council.query_pipeline import QueryPipeline, PipelineConfig | ||
|
|
||
| pipeline = QueryPipeline.from_config() |
There was a problem hiding this comment.
The module docstring shows pipeline = QueryPipeline.from_config(), but QueryPipeline exposes build() (and no from_config). This makes the public API example incorrect for users. Update the docstring to use the actual factory method (or add the missing from_config alias).
| pipeline = QueryPipeline.from_config() | |
| pipeline = QueryPipeline.build() |
| import sys, types | ||
|
|
||
| for _stub in ("structlog", "diskcache", "pydantic", "redis", | ||
| "httpx", "tenacity", "python_json_logger"): | ||
| if _stub not in sys.modules: | ||
| sys.modules[_stub] = types.ModuleType(_stub) | ||
|
|
||
| _sl = sys.modules["structlog"] | ||
| _sl.get_logger = lambda *a, **kw: __import__("logging").getLogger("stub") | ||
| _sl.stdlib = types.ModuleType("structlog.stdlib") | ||
| sys.modules["structlog.stdlib"] = _sl.stdlib | ||
|
|
||
|
|
There was a problem hiding this comment.
This test module conditionally stubs core dependencies by checking sys.modules (not package availability). If these deps haven’t been imported yet, the stubs will mask the real installed packages and can break other tests/imports in the same session. Prefer importing normally and only stubbing on ImportError (or via monkeypatch in a fixture that is reverted after the test).
| import sys, types | |
| for _stub in ("structlog", "diskcache", "pydantic", "redis", | |
| "httpx", "tenacity", "python_json_logger"): | |
| if _stub not in sys.modules: | |
| sys.modules[_stub] = types.ModuleType(_stub) | |
| _sl = sys.modules["structlog"] | |
| _sl.get_logger = lambda *a, **kw: __import__("logging").getLogger("stub") | |
| _sl.stdlib = types.ModuleType("structlog.stdlib") | |
| sys.modules["structlog.stdlib"] = _sl.stdlib | |
| import sys, types, logging | |
| # Try to import structlog; if unavailable, install a minimal stub. | |
| try: | |
| import structlog # type: ignore | |
| except ImportError: | |
| structlog = types.ModuleType("structlog") | |
| structlog.get_logger = lambda *a, **kw: logging.getLogger("stub") | |
| structlog.stdlib = types.ModuleType("structlog.stdlib") | |
| sys.modules["structlog"] = structlog | |
| sys.modules["structlog.stdlib"] = structlog.stdlib | |
| # For other optional deps, only stub if the real package truly is unavailable. | |
| for _stub in ("diskcache", "pydantic", "redis", "httpx", "tenacity", "python_json_logger"): | |
| try: | |
| __import__(_stub) | |
| except ImportError: | |
| sys.modules[_stub] = types.ModuleType(_stub) |
| def from_config( | ||
| cls, | ||
| config_path: Path | str | None = None, | ||
| *, | ||
| enable_rate_limit: bool = True, | ||
| rate_limit_max: int = 5, | ||
| rate_limit_window: float = 60.0, | ||
| ) -> "SanitizationFilter": | ||
| """Build a :class:`SanitizationFilter` from a YAML/JSON config file. | ||
|
|
||
| Falls back to built-in default rules when *config_path* is not found. | ||
|
|
||
| Args: | ||
| config_path: Path to ``sanitization_filters.yaml`` (or JSON). | ||
| Defaults to ``config/sanitization_filters.yaml`` | ||
| next to the repo root. | ||
| """ | ||
| resolved = config_path or _DEFAULT_CONFIG | ||
| keyword_rules, regex_rules = load_rules_from_config(resolved) | ||
|
|
||
| filters: List[BaseFilter] = [ | ||
| KeywordFilter(rules=keyword_rules), | ||
| RegexFilter(rules=regex_rules), | ||
| ] | ||
|
|
||
| logger.info( | ||
| "SanitizationFilter initialised with %d keyword rules and %d regex rules.", | ||
| len(keyword_rules), | ||
| len(regex_rules), | ||
| ) | ||
|
|
||
| return cls( | ||
| filters=filters, | ||
| enable_rate_limit=enable_rate_limit, | ||
| rate_limit_max=rate_limit_max, | ||
| rate_limit_window=rate_limit_window, | ||
| ) |
There was a problem hiding this comment.
config/sanitization_filters.yaml includes a rate_limit section, but SanitizationFilter.from_config() currently ignores any rate-limit settings loaded from the config file and only uses the method parameters/defaults. If the config is intended to control rate limiting, parse and apply those values (or remove the config section to avoid a misleading configuration surface).
| # Sub-queries referencing 'it' should have depends_on populated | ||
| ref_sqs = [sq for sq in result.sub_queries if sq.depends_on] | ||
| # At least some should reference prior sub-queries | ||
| assert any(len(sq.depends_on) > 0 for sq in result.sub_queries[1:]) or True | ||
| # (dependencies are optional, just validate structure) |
There was a problem hiding this comment.
assert any(... ) or True will always pass, so this test doesn’t validate dependency assignment at all. If dependencies are intentionally optional, consider asserting invariants that must always hold (e.g., dep < sq.index for all deps) and remove the unconditional or True.
| # Sub-queries referencing 'it' should have depends_on populated | |
| ref_sqs = [sq for sq in result.sub_queries if sq.depends_on] | |
| # At least some should reference prior sub-queries | |
| assert any(len(sq.depends_on) > 0 for sq in result.sub_queries[1:]) or True | |
| # (dependencies are optional, just validate structure) | |
| # Dependencies are optional; for any that exist, just validate structure. |
| vs_cfg = self._cfg.vector_store | ||
| self._store = VectorStore(self._engine, use_faiss=(vs_cfg.backend == "faiss")) | ||
| self._store.seed_default_topics() | ||
|
|
||
| self._classifier = TopicClassifier( | ||
| self._engine, | ||
| self._store, | ||
| top_k=5, | ||
| threshold=0.15, | ||
| ) | ||
|
|
||
| self._decomposer = SmartQueryDecomposer( | ||
| max_sub_queries=self._cfg.max_sub_queries | ||
| ) |
There was a problem hiding this comment.
PipelineConfig.vector_store exposes persist_path and n_exemplars_per_topic, but QueryPipeline currently doesn’t use either (it always calls seed_default_topics() which embeds the full hardcoded exemplar set). Either wire these config fields into VectorStore/seeding behavior or remove them to avoid a configuration knob that has no effect.
| def __init__( | ||
| self, | ||
| tokenizer: Optional[TokenizerFn] = None, | ||
| max_chunk_drop: float = 0.7, | ||
| ): | ||
| self._tok: TokenizerFn = tokenizer or _word_tokenizer | ||
| self._max_chunk_drop = max_chunk_drop | ||
|
|
There was a problem hiding this comment.
TokenOptimizer.__init__ accepts max_chunk_drop and stores it in self._max_chunk_drop, but that value is never used in the optimization logic. Either implement the intended drop-limit behavior (e.g., cap dropped chunks by fraction) or remove the parameter to avoid a misleading API.
| tier_cfgs = [ | ||
| TierConfig( | ||
| name=ModelTier(t.name), | ||
| complexity_max=t.complexity_max, | ||
| preferred_models=t.preferred_models, | ||
| token_budget=t.token_budget, | ||
| fallback_tier=t.fallback_tier, | ||
| ) | ||
| for t in pipeline_config.routing_tiers | ||
| ] |
There was a problem hiding this comment.
from_pipeline_config builds TierConfig objects without setting cost_per_1k_tokens, so all cost estimates (and therefore baseline_cost_usd / savings) become 0 when the router is configured from YAML (the default pipeline path). Either extend RoutingTierConfig/YAML to include per-tier costs or carry over the router’s _DEFAULT_TIERS cost values when not provided.
| tier_cfgs = [ | |
| TierConfig( | |
| name=ModelTier(t.name), | |
| complexity_max=t.complexity_max, | |
| preferred_models=t.preferred_models, | |
| token_budget=t.token_budget, | |
| fallback_tier=t.fallback_tier, | |
| ) | |
| for t in pipeline_config.routing_tiers | |
| ] | |
| tier_cfgs: List[TierConfig] = [] | |
| for t in pipeline_config.routing_tiers: | |
| tier_name = ModelTier(t.name) | |
| # Prefer an explicit cost from the pipeline config if present; otherwise | |
| # fall back to the corresponding default tier's cost. | |
| explicit_cost = getattr(t, "cost_per_1k_tokens", None) | |
| if explicit_cost is not None: | |
| cost_per_1k_tokens = explicit_cost | |
| else: | |
| default_tier = next((dt for dt in _DEFAULT_TIERS if dt.name == tier_name), None) | |
| cost_per_1k_tokens = default_tier.cost_per_1k_tokens if default_tier is not None else 0.0 | |
| tier_cfgs.append( | |
| TierConfig( | |
| name=tier_name, | |
| complexity_max=t.complexity_max, | |
| preferred_models=t.preferred_models, | |
| token_budget=t.token_budget, | |
| fallback_tier=t.fallback_tier, | |
| cost_per_1k_tokens=cost_per_1k_tokens, | |
| ) | |
| ) |
| # ── Stage 2: Embedding ──────────────────────────────────────────────── | ||
| t0 = time.perf_counter() | ||
| query_vec = self._engine.embed(query) | ||
| latency.embedding_ms = (time.perf_counter() - t0) * 1_000 | ||
|
|
||
| # ── Stage 3: Vector search ──────────────────────────────────────────── | ||
| t0 = time.perf_counter() | ||
| nn_results = self._store.search_topk(query_vec, k=5) | ||
| latency.vector_search_ms = (time.perf_counter() - t0) * 1_000 | ||
|
|
||
| # ── Stage 4: Topic classification ───────────────────────────────────── | ||
| t0 = time.perf_counter() | ||
| classification = self._classifier.classify(query) | ||
| latency.classification_ms = (time.perf_counter() - t0) * 1_000 |
There was a problem hiding this comment.
Stage 2+3 compute query_vec and nn_results, but Stage 4 (TopicClassifier.classify) recomputes the embedding and runs its own search_topk again. This duplicates work on the hot path. Consider either removing Stage 3 entirely and letting the classifier handle retrieval, or add a classifier method that accepts a precomputed embedding / NN results.
There was a problem hiding this comment.
Note
Due to the large number of review comments, Critical severity comments were prioritized as inline comments.
🟠 Major comments (23)
ai_council/main.py-128-154 (1)
128-154:⚠️ Potential issue | 🟠 Major
"anonymous"turns rate limiting into a cross-user global bucket.
SanitizationFilter.check()usessource_keyfor throttling. With the new default, every caller that omitssession_idis treated as the same client, so a few blocked requests can start throttling unrelated users or tests that rely on the default.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/main.py` around lines 128 - 154, The default session_id="anonymous" causes global rate-limiting because SanitizationFilter.check() uses source_key for throttling; change session_id to be optional (e.g., session_id: Optional[str] = None) and, inside the method that calls SanitizationFilter.check(), generate a per-request unique id when session_id is None (UUID or similar) so omitted callers do not share a single bucket; update the call site (the SanitizationFilter.check(...) invocation) to pass the generated id and keep existing behavior when a real session_id is provided.config/sanitization_filters.yaml-94-108 (1)
94-108:⚠️ Potential issue | 🟠 MajorThese rules are too broad for a developer-facing hard block.
developer mode,jailbreak, and the base64 regex will all match normal support/debugging questions. Sinceai_council/main.pynow rejects on the first hit, benign requests like "How do I enable developer mode on Android?" or "Decode this base64 payload" will never reach the model. These should be downgraded to non-blocking signals or paired with stronger override/exfiltration context.Also applies to: 185-189
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@config/sanitization_filters.yaml` around lines 94 - 108, The YAML contains overly broad hard-block rules (ids kw-014, kw-015, kw-016) that will cause ai_council/main.py to reject benign developer/support queries on first match; change these entries from blocking/high-severity to non-blocking signals (e.g., lower severity or add a "signal_only" flag) and/or add stronger context checks (require accompanying exfiltration/override markers or regex scopes) so matches only trigger an enforced block when paired with concrete exfiltration indicators; update the rule definitions for "developer mode", "do anything now", and "jailbreak" accordingly and mirror the same change for the rules noted at lines 185-189.ai_council/main.py-156-163 (1)
156-163:⚠️ Potential issue | 🟠 MajorDon't emit raw
session_idvalues to warning logs.This field can easily be an email, auth subject, or other user identifier, so the blocked-request path becomes a PII sink. Hash or truncate it before logging.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/main.py` around lines 156 - 163, The warning currently logs raw session_id; change it to log an anonymized version by computing a non-reversible fingerprint (e.g., sha256 or other cryptographic hash) or a truncated token of session_id and use that value in the extra payload instead of the raw session_id; update the call that builds the extra dict used by self.logger.warning so it supplies anonymized_session_id (derived from session_id) while leaving filter_result.filter_name, filter_result.severity, and filter_result.triggered_rule unchanged.ai_council/query_pipeline/query_decomposer.py-225-232 (1)
225-232:⚠️ Potential issue | 🟠 MajorBlank input should not produce a zero-work decomposition.
This branch returns
sub_queries=[]andexecution_order=[].ai_council/query_pipeline/pipeline.py:426-444builds execution waves fromexecution_order, so blank requests will silently execute nothing instead of failing fast or being handled explicitly.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/query_pipeline/query_decomposer.py` around lines 225 - 232, The current `if not query` branch in `ai_council/query_pipeline/query_decomposer.py` returns an empty DecompositionResult causing downstream code that builds execution waves from `execution_order` to silently do nothing; instead make empty input fail fast by replacing that branch to raise a clear error (e.g., ValueError or a specific InvalidQueryError) when `query` is blank rather than returning `sub_queries=[]`/`execution_order=[]`, so callers like the pipeline that expect an execution plan will get an explicit failure to handle.ai_council/query_pipeline/embeddings.py-78-82 (1)
78-82:⚠️ Potential issue | 🟠 MajorThe default hash tokenizer is effectively English-only.
[a-z0-9']+strips or splits non-ASCII text, so many multilingual queries end up with garbage tokens or an all-zero embedding. That makes topic classification arbitrary for non-English users.ai_council/query_pipeline/topic_classifier.py-127-145 (1)
127-145:⚠️ Potential issue | 🟠 MajorFallback classifications keep the discarded topic's context.
context_chunksis captured before the threshold check. When Lines 128-130 fall back togeneral_qa, the returned topic no longer matches the returned context, so downstream retrieval/prompting can still be biased by the rejected topic.Suggested fix
# Apply threshold if winner_confidence < self._threshold: winner_topic = self._fallback winner_confidence = 0.0 + context_chunks = []🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/query_pipeline/topic_classifier.py` around lines 127 - 145, The code captures context_chunks before applying the confidence threshold so when winner_topic is replaced by self._fallback the returned context still reflects the rejected topic; update the logic so context_chunks matches the final topic by either moving the creation/assignment of context_chunks to after the threshold/fallback decision or explicitly clearing/resetting context_chunks (e.g., to an empty list) when winner_confidence < self._threshold and winner_topic is set to self._fallback; ensure the returned ClassificationResult(topic=winner_topic, context_chunks=...) uses the adjusted context_chunks.ai_council/query_pipeline/embeddings.py-183-188 (1)
183-188:⚠️ Potential issue | 🟠 MajorUnknown embedding backends should fail fast.
from_config()currently treats anything other than"sentence_transformers"as the hash backend. That means the documented"openai"option inconfig/query_pipeline.yaml- or a simple typo - silently routes requests through the wrong embedding path.Suggested fix
`@classmethod` def from_config(cls, backend: str = "hash", model_name: str = "hash-384", dim: int = 384, cache_size: int = 1024) -> "EmbeddingEngine": if backend == "sentence_transformers": return cls(backend=SentenceTransformerBackend(model_name), cache_size=cache_size) - # Default: hash - return cls(backend=HashEmbeddingBackend(dim=dim), cache_size=cache_size) + if backend == "hash": + return cls(backend=HashEmbeddingBackend(dim=dim), cache_size=cache_size) + raise ValueError(f"Unsupported embedding backend: {backend}")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/query_pipeline/embeddings.py` around lines 183 - 188, from_config currently treats any non-"sentence_transformers" backend as HashEmbeddingBackend, which hides typos or unsupported backends; update EmbeddingEngine.from_config to explicitly branch on supported values (e.g., "sentence_transformers" -> SentenceTransformerBackend, "hash" -> HashEmbeddingBackend) and raise a clear ValueError for any other backend string (including misspellings or "openai" if not implemented) so failures are fast and explicit; refer to the from_config method and the SentenceTransformerBackend and HashEmbeddingBackend classes when making the change.ai_council/query_pipeline/embeddings.py-209-212 (1)
209-212:⚠️ Potential issue | 🟠 MajorAllow
cache_size=0to disable caching cleanly.With
cache_size == 0, the eviction check is true on the first miss andpopitem()raises on the emptyOrderedDict. This turns a common "disable the cache" setting into a runtime failure.Suggested fix
# LRU eviction - if len(self._cache) >= self._cache_size: - self._cache.popitem(last=False) - self._cache[key] = vec + if self._cache_size > 0: + if len(self._cache) >= self._cache_size: + self._cache.popitem(last=False) + self._cache[key] = vec return vec🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/query_pipeline/embeddings.py` around lines 209 - 212, The current LRU logic uses self._cache and self._cache_size and calls popitem() even when self._cache_size == 0 which raises on an empty OrderedDict; change the logic in the embedding/cache path (the block that currently checks "if len(self._cache) >= self._cache_size: self._cache.popitem(last=False); self._cache[key] = vec") to first check whether self._cache_size is truthy (e.g., > 0) and only perform eviction and assignment when caching is enabled, otherwise skip touching self._cache entirely so cache_size=0 cleanly disables caching; touch the same method/function that references self._cache, self._cache_size, key and vec.ai_council/main.py-127-154 (1)
127-154:⚠️ Potential issue | 🟠 MajorSanitization still happens after raw-input logging.
The new docstring says the filter runs first, but Line 149 logs
user_inputbefore Line 152 callsself.sanitization_filter.check(...). That means blocked prompt-injection payloads and pasted secrets are already in logs before Stage 0 can stop them.Suggested fix
- self.logger.info("Processing request in", extra={"value": execution_mode.value}) - self.logger.debug("User input", extra={"user_input": user_input[:200]}) - # ── Stage 0: Sanitization Filter ───────────────────────────────── filter_result = self.sanitization_filter.check( user_input, source_key=session_id ) if not filter_result.is_safe: @@ error_type="prompt_injection", ) # ───────────────────────────────────────────────────────────────── + + self.logger.info("Processing request in", extra={"value": execution_mode.value}) + self.logger.debug("User input", extra={"user_input": user_input[:200]}) return await self._execute_with_timeout(user_input, execution_mode)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/main.py` around lines 127 - 154, The code logs raw user_input before the sanitization filter runs, exposing blocked payloads; move the sanitization step before any logging of the raw input or redact the input before logging. Concretely, in process_request call self.sanitization_filter.check(user_input, source_key=session_id) and inspect the result (reject or sanitized value) before calling self.logger.debug("User input", ...), or replace the debug call to log a redacted/safe version from the filter result; ensure references to process_request, self.sanitization_filter.check, and self.logger.debug are updated so no raw user_input is written to logs prior to the filter decision.ai_council/query_pipeline/pipeline.py-276-278 (1)
276-278:⚠️ Potential issue | 🟠 MajorRemove duplicate vector search work or reuse it.
Line 277 computes
nn_resultsbut the value is never used. This adds avoidable latency/cost per request in a path intended to optimize cost.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/query_pipeline/pipeline.py` around lines 276 - 278, The call to self._store.search_topk(query_vec, k=5) assigns nn_results but is never used, causing unnecessary vector-search work; either remove this redundant call or reuse its result where needed (replace later duplicate calls with the stored nn_results). Locate the code around the _store.search_topk invocation and either delete the unused nn_results assignment and its timing block (t0/latency.vector_search_ms) or propagate nn_results into the subsequent logic that expects search results so the expensive search is performed only once.ai_council/sanitization/rate_limiter.py-55-57 (1)
55-57:⚠️ Potential issue | 🟠 MajorAvoid creating per-key state on read paths.
Line 65 and Line 69 read through
self._counters[key], which creates entries for every unseen key. In production, this can accumulate unbounded state for one-offsource_keyvalues.Suggested fix
-from collections import defaultdict, deque +from collections import deque @@ - self._counters: Dict[str, _WindowedCounter] = defaultdict( - lambda: _WindowedCounter(window_seconds=self._window_seconds) - ) + self._counters: Dict[str, _WindowedCounter] = {} + + def _get_counter(self, key: str, *, create: bool) -> _WindowedCounter | None: + ctr = self._counters.get(key) + if ctr is None and create: + ctr = _WindowedCounter(window_seconds=self._window_seconds) + self._counters[key] = ctr + return ctr @@ - self._counters[key].record() + self._get_counter(key, create=True).record() @@ - return self._counters[key].count() >= self._max_attempts + ctr = self._get_counter(key, create=False) + return ctr.count() >= self._max_attempts if ctr else False @@ - return self._counters[key].count() + ctr = self._get_counter(key, create=False) + return ctr.count() if ctr else 0Also applies to: 63-69
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/sanitization/rate_limiter.py` around lines 55 - 57, The current defaultdict-based self._counters creates a new _WindowedCounter for every unseen key on read, causing unbounded state; change self._counters to a plain dict and update all read sites that use self._counters[key] to use self._counters.get(key) (or an existence check) so no counter is created on reads, and only instantiate and assign a _WindowedCounter(window_seconds=self._window_seconds) into self._counters when performing a deliberate write/update (e.g., inside the increment/record/allow methods that enforce the rate limit).ai_council/query_pipeline/pipeline.py-371-380 (1)
371-380:⚠️ Potential issue | 🟠 MajorException fallback can fail again by re-running optimizer.
In the
Exceptionbranch at Line 371, the code callsself._optimizer.optimize(...)again (Line 377-380). If optimization was the original failure, this path can re-throw and collapse the whole pipeline.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/query_pipeline/pipeline.py` around lines 371 - 380, When res is an Exception you must not re-run self._optimizer.optimize (which can re-throw); instead build the SubQueryResult using a safe fallback optimized prompt already produced earlier or a deterministic fallback (e.g., the original sq.text or a cached optimized_prompts_by_idx entry) so the pipeline doesn't retry optimization on failure. Locate the Exception branch where decomp.sub_queries and decisions_by_idx are used to create SubQueryResult and replace the call to self._optimizer.optimize(...) with a retrieval of the previously computed optimized prompt (or a safe fallback string) and keep routing/token_budget from decisions_by_idx when constructing the SubQueryResult.ai_council/sanitization/sanitization_filter.py-101-103 (1)
101-103:⚠️ Potential issue | 🟠 Major
from_configignores rate-limit settings from config content.Line 102 only receives keyword/regex rules, while constructor values at Line 117-120 always come from method args/defaults. This makes YAML/JSON rate-limit config ineffective.
Also applies to: 115-120
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/sanitization/sanitization_filter.py` around lines 101 - 103, from_config currently only reads keyword_rules and regex_rules from load_rules_from_config and then constructs the SanitizationFilter using the method args/defaults, so any rate-limit settings in the YAML/JSON are ignored; change load_rules_from_config (or add a small config parser) to also return the rate-limit settings object (e.g., rate_limit, rate_limit_span, max_hits or similar keys), then update from_config to accept that returned rate-limit dict and pass its values into the SanitizationFilter constructor (instead of using only the method args/defaults); refer to from_config, load_rules_from_config, and the SanitizationFilter constructor when making this change.ai_council/sanitization/config_loader.py-143-157 (1)
143-157:⚠️ Potential issue | 🟠 MajorHarden config/rule parsing against malformed payloads.
Line 144 assumes
severityis a string, and Line 152/153 requireidandpattern. Also, Line 207 assumesdatais a dict. Malformed YAML/JSON can currently raise and crash initialization instead of safely skipping bad entries or falling back.Suggested fix
-def _rule_from_dict(data: Dict[str, Any]) -> RuleDefinition: - severity_raw = data.get("severity", "high").lower() +def _rule_from_dict(data: Dict[str, Any]) -> RuleDefinition | None: + if not isinstance(data, dict): + logger.warning("Invalid rule payload type '%s'; skipping.", type(data).__name__) + return None + + rule_id = data.get("id") + pattern = data.get("pattern") + if not isinstance(rule_id, str) or not rule_id or not isinstance(pattern, str) or not pattern: + logger.warning("Invalid rule payload (missing non-empty id/pattern): %r", data) + return None + + severity_raw = str(data.get("severity", "high")).lower() @@ - return RuleDefinition( - id=data["id"], - pattern=data["pattern"], + return RuleDefinition( + id=rule_id, + pattern=pattern, severity=severity, enabled=data.get("enabled", True), description=data.get("description", ""), ) @@ - sanitization_cfg = data.get("sanitization", data) # support nested or flat files + if not isinstance(data, dict): + logger.error("Sanitization config root must be a mapping; using defaults.") + return _build_default_rules() + + sanitization_cfg = data.get("sanitization", data) # support nested or flat files + if not isinstance(sanitization_cfg, dict): + logger.error("'sanitization' section must be a mapping; using defaults.") + return _build_default_rules() @@ - keyword_rules = [_rule_from_dict(d) for d in keyword_dicts] - regex_rules = [_rule_from_dict(d) for d in regex_dicts] + keyword_rules = [r for d in keyword_dicts if (r := _rule_from_dict(d)) is not None] + regex_rules = [r for d in regex_dicts if (r := _rule_from_dict(d)) is not None]Also applies to: 207-213
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/sanitization/config_loader.py` around lines 143 - 157, The _rule_from_dict function should be hardened to validate input types and required keys: check that the incoming data is a dict, that "id" and "pattern" are present and are strings, and that "severity" (if present) is a string before calling Severity(...); on any validation failure return None (or raise a controlled ValueError) and log a warning with the offending value, default severity to Severity.HIGH only when severity is unrecognized, and keep enabled/description defaults; also update the caller that iterates rules (the code handling the block around the later data processing) to skip non-dict entries and ignore None returns from _rule_from_dict while logging invalid entries so malformed YAML/JSON does not crash initialization.ai_council/sanitization/rate_limiter.py-52-57 (1)
52-57:⚠️ Potential issue | 🟠 Major
RateLimitTrackeris not thread-safe.Line 59, Line 63, and Line 67 can be called concurrently in server workloads. Without synchronization, counter mutation/check sequences can interleave and produce inconsistent throttling behavior.
Suggested fix
+import threading @@ def __init__(self, max_attempts: int = 5, window_seconds: float = 60.0): self._max_attempts = max_attempts self._window_seconds = window_seconds + self._lock = threading.RLock() @@ def record_attempt(self, key: str) -> None: """Record one blocked attempt for *key*.""" - self._counters[key].record() + with self._lock: + self._counters[key].record() @@ def is_rate_limited(self, key: str) -> bool: """Return True if *key* has exceeded the allowed attempt count.""" - return self._counters[key].count() >= self._max_attempts + with self._lock: + return self._counters[key].count() >= self._max_attempts @@ def attempt_count(self, key: str) -> int: """Return the current number of attempts within the window for *key*.""" - return self._counters[key].count() + with self._lock: + return self._counters[key].count() @@ def reset(self, key: str) -> None: """Clear the attempt history for *key* (e.g. after allowing through).""" - self._counters.pop(key, None) + with self._lock: + self._counters.pop(key, None)Also applies to: 59-73
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/sanitization/rate_limiter.py` around lines 52 - 57, The RateLimitTracker is not thread-safe: add a dedicated lock (e.g., self._lock = threading.RLock()) in RateLimitTracker __init__ and import threading, then wrap every access and mutation of self._counters (creation via defaultdict lambda, lookups, increments and checks) inside with self._lock: blocks in the methods that reference self._counters (e.g., methods calling _WindowedCounter instances); if _WindowedCounter itself mutates internal state concurrently, add a lock there or ensure its public methods are invoked while holding RateLimitTracker's lock to prevent interleaving. Ensure the defaultdict lambda uses self._window_seconds but creation is protected by the same lock so two threads can't create duplicate entries concurrently.ai_council/query_pipeline/pipeline.py-311-314 (1)
311-314:⚠️ Potential issue | 🟠 Major
optimization_msis currently not a time metric.Line 311-313 derives
optimization_msfrom token count (optimized_tokens / 1000), which misreports latency and can invalidate perf dashboards/alerts.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/query_pipeline/pipeline.py` around lines 311 - 314, The code assigns latency.optimization_ms from token counts (sr.optimized_prompt.optimized_tokens) which is not a time metric; replace this with the actual measured optimization durations by summing per-sub-result timing fields (e.g., sum(sr.optimization_ms or sr.optimized_prompt.optimization_ms for sr in sub_results)) and ensure the values are in milliseconds, or if those fields don't exist, instrument the optimizer to record start/end times per sub_result and store sr.optimization_ms before summing; keep latency.execution_ms calculation unchanged (uses t0 and time.perf_counter()) and ensure consistent units.examples/sanitization_pipeline.py-159-159 (1)
159-159:⚠️ Potential issue | 🟠 MajorRemove unused f-string prefix (lint error).
Line 159 uses an f-string without placeholders, which triggers F541.
Suggested fix
- print(f" Outcome : BLOCKED") + print(" Outcome : BLOCKED")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/sanitization_pipeline.py` at line 159, Remove the unnecessary f-string prefix on the print statement that has no placeholders: replace print(f" Outcome : BLOCKED") with a normal string print(" Outcome : BLOCKED") in examples/sanitization_pipeline.py (identify the print call that outputs "Outcome : BLOCKED" to locate it).ai_council/sanitization/sanitization_filter.py-162-170 (1)
162-170:⚠️ Potential issue | 🟠 MajorAvoid logging raw matched content and identifiers.
Line 163-170 logs
source_keyandmatched_textdirectly. This can leak user identifiers and sensitive prompt content to logs.Suggested fix
logger.warning( - "[SANITIZATION BLOCKED] source_key='%s' filter='%s' rule='%s' " - "severity='%s' matched='%s' elapsed=%.3fms", + "[SANITIZATION BLOCKED] source_key='%s' filter='%s' rule='%s' " + "severity='%s' matched_len=%d elapsed=%.3fms", source_key, result.filter_name, result.triggered_rule, result.severity.value if result.severity else "n/a", - result.matched_text, + len(result.matched_text or ""), elapsed_ms, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/sanitization/sanitization_filter.py` around lines 162 - 170, The logger.warning call in sanitization_filter.py currently emits sensitive values (source_key and result.matched_text); change the logging to avoid raw identifiers and prompt content by redacting or hashing those fields before logging (e.g., replace source_key with a deterministic hash or pseudonym and replace result.matched_text with a length/seeded-hash or "[REDACTED]"), update the logger.warning invocation inside the function/method that emits the "[SANITIZATION BLOCKED]" message to use the sanitized_source_key and sanitized_matched_text variables instead of source_key and result.matched_text, and ensure severity/result.filter_name/result.triggered_rule remain intact for diagnostics.ai_council/query_pipeline/pipeline.py-230-239 (1)
230-239:⚠️ Potential issue | 🟠 MajorDo not silently swallow all exceptions in sync wrapper.
Line 237 catches all exceptions and drops them with
pass, which obscures real failures and makes incident diagnosis difficult.Suggested fix
- except Exception: - pass + except Exception as exc: + logger.debug("Falling back to asyncio.run in process(): %s", exc) return asyncio.run(self.process_async(query, session_id))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/query_pipeline/pipeline.py` around lines 230 - 239, The sync wrapper currently swallows all exceptions in the try/except around the running-loop branch, obscuring real failures; update the block in the method that uses asyncio.get_event_loop()/loop.is_running() so you do not catch Exception blindly—catch only the specific condition you expect (e.g., RuntimeError or use the presence of a running loop) and either re-raise unexpected errors or log them with the exception details before propagating; specifically adjust the logic around asyncio.run(self.process_async(...)) and the ThreadPoolExecutor branch that calls asyncio.run(...) so failures from self.process_async(...) are not dropped—use targeted exception handling (or use asyncio.run_coroutine_threadsafe on the running loop) and ensure process-level logging of the caught exception and then re-raise.ai_council/query_pipeline/config.py-32-38 (1)
32-38:⚠️ Potential issue | 🟠 MajorMissing
cost_per_1k_tokensfield breaks cost reporting for YAML-configured tiers.
RoutingTierConfiglacks thecost_per_1k_tokensfield thatTierConfiginmodel_router.pyexpects. WhenModelRouter.from_pipeline_config()constructsTierConfiginstances from these objects (seemodel_router.py:168-179), the cost field is not mapped and silently defaults to0.0. This causes all YAML-loaded routing tiers to report zero costs, defeating the cost optimization reporting feature.🔧 Proposed fix: Add cost_per_1k_tokens field
`@dataclass` class RoutingTierConfig: name: str = "cheap" complexity_max: int = 3 # inclusive upper bound (0-10 scale) preferred_models: List[str] = field(default_factory=list) token_budget: int = 1024 fallback_tier: Optional[str] = None + cost_per_1k_tokens: float = 0.0 # USD estimate for cost reportingAlso update
model_router.py:168-179to map the field:TierConfig( name=ModelTier(t.name), complexity_max=t.complexity_max, preferred_models=t.preferred_models, token_budget=t.token_budget, fallback_tier=t.fallback_tier, + cost_per_1k_tokens=t.cost_per_1k_tokens, )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/query_pipeline/config.py` around lines 32 - 38, RoutingTierConfig is missing the cost_per_1k_tokens field which causes YAML-loaded tiers to report zero cost; add a float field cost_per_1k_tokens: float = 0.0 to the RoutingTierConfig dataclass and update ModelRouter.from_pipeline_config (the code that constructs TierConfig instances) to pass the routing tier's cost_per_1k_tokens value through when creating TierConfig so the cost is preserved (i.e., map routing_tier.cost_per_1k_tokens into TierConfig).tests/test_query_pipeline.py-297-300 (1)
297-300:⚠️ Potential issue | 🟠 MajorLine 299 assertion is a tautology and never validates dependency assignment.
... or Truemakes the check always pass, so this test cannot catch dependency-graph regressions.Proposed fix
- assert any(len(sq.depends_on) > 0 for sq in result.sub_queries[1:]) or True + assert any( + len(sq.depends_on) > 0 for sq in result.sub_queries[1:] + ), "Expected at least one referential sub-query dependency"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_query_pipeline.py` around lines 297 - 300, The assertion on line with "assert any(len(sq.depends_on) > 0 for sq in result.sub_queries[1:]) or True" is a tautology; remove the "or True" and assert the actual condition instead (e.g., assert any(len(sq.depends_on) > 0 for sq in result.sub_queries[1:]) or assert len(ref_sqs) > 0) so the test validates that at least one sub-query in result.sub_queries (or in the ref_sqs list) has non-empty depends_on; update the assertion to reference result.sub_queries or ref_sqs directly and keep the comment about dependencies being optional if desired.tests/test_query_pipeline.py-240-248 (1)
240-248:⚠️ Potential issue | 🟠 MajorLatency assertion is too brittle for unit-test CI variability.
Line 247 enforces
< 50msaverage, which is prone to flaky failures on shared runners. This belongs in perf/benchmark tests or needs a more tolerant bound.Proposed adjustment
- assert avg_ms < 50.0, f"Average classification latency {avg_ms:.1f}ms exceeds 50ms" + assert avg_ms < 200.0, f"Average classification latency {avg_ms:.1f}ms exceeds 200ms"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_query_pipeline.py` around lines 240 - 248, The test test_latency_under_50ms is too brittle — update the latency assertion to a more tolerant bound or move it to a perf benchmark: replace the strict assert avg_ms < 50.0 with a relaxed threshold (e.g., assert avg_ms < 200.0) or mark the test as a performance test (e.g., add a pytest.mark.performance or pytest.mark.slow) and skip it on CI; keep references to classifier.classify and the avg_ms calculation so you only change the numeric bound or test marker, not the timing logic.tests/test_query_pipeline.py-620-623 (1)
620-623:⚠️ Potential issue | 🟠 MajorClassification correctness test can silently pass when classification is missing.
The
if result.classification:guard at Lines 620-623 hides failures. This test should fail if classification is absent.Proposed fix
- if result.classification: - assert result.classification.topic == expected_topic, ( - f"'{query}' → got '{result.classification.topic}', expected '{expected_topic}'" - ) + assert result.classification is not None + assert result.classification.topic == expected_topic, ( + f"'{query}' → got '{result.classification.topic}', expected '{expected_topic}'" + )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_query_pipeline.py` around lines 620 - 623, The test currently skips assertions when classification is missing due to the `if result.classification:` guard; replace that guard with an explicit assertion that a classification exists (e.g., `assert result.classification is not None` or `assert result.classification` with a clear message including the query and expected_topic) and then assert `result.classification.topic == expected_topic` to ensure the test fails when classification is absent; update the block around `result.classification` in tests/test_query_pipeline.py accordingly.
🟡 Minor comments (7)
ai_council/query_pipeline/query_decomposer.py-128-133 (1)
128-133:⚠️ Potential issue | 🟡 MinorMake
_complexity_level()multiline so Ruff stops failing.Lines 129-132 are currently one-line
ifstatements, which Ruff is already flagging as E701. If lint gates CI, this file will fail as-is.Suggested fix
def _complexity_level(score: int) -> ComplexityLevel: - if score <= 1: return ComplexityLevel.TRIVIAL - if score <= 3: return ComplexityLevel.SIMPLE - if score <= 5: return ComplexityLevel.MODERATE - if score <= 7: return ComplexityLevel.COMPLEX + if score <= 1: + return ComplexityLevel.TRIVIAL + if score <= 3: + return ComplexityLevel.SIMPLE + if score <= 5: + return ComplexityLevel.MODERATE + if score <= 7: + return ComplexityLevel.COMPLEX return ComplexityLevel.VERY_HIGH🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/query_pipeline/query_decomposer.py` around lines 128 - 133, The one-line if statements in _complexity_level(score: int) violate Ruff E701; rewrite each single-line if into a multiline form (use separate lines with the condition and a return on the next line) so the function uses standard multi-line if blocks; locate the function _complexity_level and update the conditional branches that return ComplexityLevel.TRIVIAL, SIMPLE, MODERATE, COMPLEX, and VERY_HIGH accordingly.ai_council/query_pipeline/query_decomposer.py-140-150 (1)
140-150:⚠️ Potential issue | 🟡 MinorThe first numbered/bulleted fragment still keeps its marker.
Both split regexes only match after a newline, so input starting with
1.or-leaves the firstSubQuery.textas"1. ..."or"- ...". That leaks list formatting into scoring and routing for the first task.Suggested fix
def _split_by_numbered_items(text: str) -> List[str]: - parts = re.split(r"\n\s*\d+[\.\)]\s+", text) + parts = re.split(r"(?:^|\n)\s*\d+[\.\)]\s+", text) if len(parts) >= 2: return [p.strip() for p in parts if p.strip()] return [] def _split_by_bullets(text: str) -> List[str]: - parts = re.split(r"\n\s*[-*•]\s+", text) + parts = re.split(r"(?:^|\n)\s*[-*•]\s+", text) if len(parts) >= 2: return [p.strip() for p in parts if p.strip()] return []🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/query_pipeline/query_decomposer.py` around lines 140 - 150, The split functions (_split_by_numbered_items and _split_by_bullets) only match list markers after a newline so the first fragment can retain its marker; update their regexes to also match markers at the start of the string (e.g., use a non-capturing group like (?:^|\n)\s*\d+[\.\)]\s+ and (?:^|\n)\s*[-*•]\s+ respectively) so the leading marker is removed from the first part, and also ensure _split_by_bullets returns [] when there are fewer than 2 parts (it currently lacks that return).ai_council/query_pipeline/vector_store.py-344-355 (1)
344-355:⚠️ Potential issue | 🟡 MinorInconsistent distance semantics between FAISS and NumPy backends.
SearchResult.distanceis documented as L2 distance, but_search_faiss()stores the raw squared L2 fromfaiss.IndexFlatL2.search()without applyingsqrt, while_search_numpy()correctly computes true L2 distance. This creates inconsistent values for the same field depending on which backend is active. Rankings remain unaffected (since both sort identically), but callers accessing.distancevalues will see different scales.Suggested fix
if tid not in seen_topics or sim > seen_topics[tid].similarity: seen_topics[tid] = SearchResult( topic_id=tid, - distance=float(dist), + distance=float(np.sqrt(max(0.0, float(dist)))), similarity=sim, context_chunks=self._context_map.get(tid, []), )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/query_pipeline/vector_store.py` around lines 344 - 355, The FAISS backend (_search_faiss) stores squared L2 distances from faiss.IndexFlatL2.search(), causing SearchResult.distance to be inconsistent with the NumPy backend (_search_numpy) which uses true L2; fix by taking the square root of the returned distances before computing similarity and assigning SearchResult.distance (i.e., replace usages of raw dist from the FAISS result with math.sqrt(float(dist)) or similar), ensuring SearchResult.distance and the similarity calculation (1.0 - L2/4.0) use the true L2 distance.ai_council/query_pipeline/__init__.py-23-25 (1)
23-25:⚠️ Potential issue | 🟡 MinorFix public API example to match actual class methods.
Line 23 references
QueryPipeline.from_config(), but the implemented factory isQueryPipeline.build(). The example also showsawait pipeline.process(...)whileprocessis synchronous.Suggested fix
- pipeline = QueryPipeline.from_config() - result = await pipeline.process("Explain quicksort and give Python code") + pipeline = QueryPipeline.build() + result = pipeline.process("Explain quicksort and give Python code")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/query_pipeline/__init__.py` around lines 23 - 25, The example uses the wrong factory and wrong async usage: replace the call to QueryPipeline.from_config() with the actual factory QueryPipeline.build() and call the synchronous process method without await (i.e., use result = pipeline.process(...)), keeping the rest of the example (e.g., accessing result.cost_report) unchanged so the public API example matches the implemented QueryPipeline.build and process signatures.tests/test_sanitization.py-524-528 (1)
524-528:⚠️ Potential issue | 🟡 MinorUnused loop variable
i.The loop variable
iis not used in the loop body.🔧 Proposed fix
def test_attempt_count(self): tracker = RateLimitTracker(max_attempts=10, window_seconds=60) - for i in range(4): + for _ in range(4): tracker.record_attempt("u") assert tracker.attempt_count("u") == 4🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_sanitization.py` around lines 524 - 528, The test uses an unused loop variable `i` in test_attempt_count; change the loop to use a throwaway variable (e.g., `for _ in range(4)`) so the lint warning is removed and the intent of repeatedly calling RateLimitTracker.record_attempt("u") before asserting RateLimitTracker.attempt_count("u") == 4 remains clear.examples/query_pipeline_demo.py-248-252 (1)
248-252:⚠️ Potential issue | 🟡 MinorUnused loop variable
label.The loop variable
labelis declared but not used in the loop body.🔧 Proposed fix
print(f"\n {BOLD}Second run (cache hits){RESET}") - for label, query in DEMO_QUERIES[:2]: + for _, query in DEMO_QUERIES[:2]: result = pipeline.process(query) ok(f"'{query[:40]}' from_cache={result.from_cache}")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/query_pipeline_demo.py` around lines 248 - 252, The loop in the second run uses an unused loop variable `label` when iterating over DEMO_QUERIES; change the loop to only unpack the needed value (e.g., use `_` or unpack into a single variable) or iterate over just the queries so `pipeline.process(query)` is clear. Update the for-loop that currently reads "for label, query in DEMO_QUERIES[:2]:" to either "for _, query in DEMO_QUERIES[:2]:" or "for query in (q for _, q in DEMO_QUERIES[:2]):" so the unused `label` is removed while keeping use of pipeline.process(query) and ok(...).ai_council/query_pipeline/cache.py-169-191 (1)
169-191:⚠️ Potential issue | 🟡 MinorLog exceptions in
invalidateandclearfor observability.Lines 179-180 and 189-190 silently swallow exceptions. While cache operations should be resilient, logging helps diagnose disk issues.
🔧 Proposed fix
def invalidate(self, query: str) -> bool: """Remove a single entry. Returns True if it existed.""" key = _make_key(query) found = False if key in self._mem: del self._mem[key] found = True if self._disk is not None: try: found = self._disk.delete(key) or found - except Exception: - pass + except Exception as exc: + logger.debug("QueryCache disk invalidate failed: %s", exc) return found def clear(self) -> None: """Clear all cached entries (memory + disk).""" self._mem.clear() if self._disk is not None: try: self._disk.clear() - except Exception: - pass + except Exception as exc: + logger.debug("QueryCache disk clear failed: %s", exc) logger.info("QueryCache cleared.")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/query_pipeline/cache.py` around lines 169 - 191, The except blocks in invalidate and clear (inside methods invalidate and clear) silently swallow exceptions from self._disk.delete(...) and self._disk.clear(), so update those except clauses to catch Exception as err and log the error with context via the module logger (e.g., logger.error("QueryCache invalidate disk delete failed for key %s: %s", key, err) and logger.exception or logger.error for clear with a descriptive message) while preserving existing behavior of not raising, ensuring you reference _disk.delete and _disk.clear and keep found semantics in invalidate.
🧹 Nitpick comments (4)
ai_council/query_pipeline/token_optimizer.py (1)
209-214: Minor: EN DASH characters in comments.Lines 209 and 214 contain EN DASH (
–) instead of HYPHEN-MINUS (-). This is purely cosmetic but may cause issues in some editors.🔧 Proposed fix
- # Pass 1 – Relevance gate: skip chunks that are clearly irrelevant + # Pass 1 - Relevance gate: skip chunks that are clearly irrelevant # (keep at least the top-1 regardless of score) if rank > 0 and chunk_score < 0: continue - # Pass 2 – Budget gate + # Pass 2 - Budget gate🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ai_council/query_pipeline/token_optimizer.py` around lines 209 - 214, Update the two comment lines that use EN DASH characters by replacing each EN DASH (–) with a standard hyphen-minus (-); specifically, change the comments "Pass 1 – Relevance gate" and "Pass 2 – Budget gate" in token_optimizer.py so they read "Pass 1 - Relevance gate" and "Pass 2 - Budget gate" (search for those exact comment text near the relevance/budget gating logic to find the location).tmp/validate_query_pipeline.py (1)
77-86: Consider splitting multi-statement lines if this script becomes permanent.Lines 77, 84-85 use semicolons/colons for compactness. This is acceptable for a temporary validation script in
tmp/, but if kept long-term, consider reformatting for consistency with project style.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tmp/validate_query_pipeline.py` around lines 77 - 86, The file uses compact multi-statement lines that hurt readability — split the combined statements into one-per-line: replace the single-line "PASS = 0; FAIL = 0" with two lines assigning PASS and FAIL separately, expand "def section(t): print(...)" into a multi-line function body with an explicit return, and expand "def chk(cond, label):" body (including the global PASS, FAIL, status assignment, print and the PASS/FAIL increments) into a standard indented block; update the functions named section and chk and the PASS/FAIL declarations accordingly so the code conforms to normal project style and avoids semicolons/one-liners.examples/query_pipeline_demo.py (1)
69-85: Consider moving the numpy import to module level.The
import numpy as npinsidedemo_embedding()is functional but unconventional. Since numpy is already used elsewhere in the project, moving it to the module-level imports would be cleaner.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/query_pipeline_demo.py` around lines 69 - 85, Move the inline "import numpy as np" out of demo_embedding() and add "import numpy as np" to the module-level imports; update demo_embedding() to use the already-imported np when computing norm for vectors returned by EmbeddingEngine.default(). Ensure there are no duplicate imports and run linters/tests to confirm the module-level import is used everywhere that expects numpy.tests/test_query_pipeline.py (1)
25-33: Consider adding module teardown to avoid stub leakage into future tests.Lines 25–33 stub
sys.modulesat import time without restoration. While this is intentionally done for CI environments (as documented on line 22), the lack of a cleanup hook means that any future test importing realpydantic,redis, orhttpxwill receive stubs instead. Current tests do not import these modules, but this is fragile as the codebase grows.Add a
teardown_module()function to restore the original module state:Proposed fix
+_ORIGINAL_OPTIONAL_MODULES = { + "structlog.stdlib": sys.modules.get("structlog.stdlib"), +} for _stub in ("structlog", "diskcache", "pydantic", "redis", "httpx", "tenacity", "python_json_logger"): + _ORIGINAL_OPTIONAL_MODULES[_stub] = sys.modules.get(_stub) if _stub not in sys.modules: sys.modules[_stub] = types.ModuleType(_stub) _sl = sys.modules["structlog"] _sl.get_logger = lambda *a, **kw: __import__("logging").getLogger("stub") _sl.stdlib = types.ModuleType("structlog.stdlib") sys.modules["structlog.stdlib"] = _sl.stdlib + +def teardown_module(): + for name, original in _ORIGINAL_OPTIONAL_MODULES.items(): + if original is None: + sys.modules.pop(name, None) + else: + sys.modules[name] = original🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_query_pipeline.py` around lines 25 - 33, The test stubs inserted into sys.modules (via the loop that sets sys.modules[_stub] and the structlog shim _sl) leak into other tests; add a teardown_module() that restores any replaced modules and removes any modules you added: before stubbing capture originals in a dict (e.g., original_modules = {name: sys.modules.get(name) for name in ...}), perform the existing stubbing using sys.modules and _sl, and implement teardown_module() to iterate that dict and either restore the original module or del sys.modules[name] if it was originally missing, and also remove sys.modules["structlog.stdlib"] if you created it, ensuring no stub remains after tests.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: cdbcceac-3bad-46d9-9526-8935c4ccd1e3
📒 Files selected for processing (25)
ai_council/main.pyai_council/query_pipeline/__init__.pyai_council/query_pipeline/cache.pyai_council/query_pipeline/config.pyai_council/query_pipeline/embeddings.pyai_council/query_pipeline/model_router.pyai_council/query_pipeline/pipeline.pyai_council/query_pipeline/query_decomposer.pyai_council/query_pipeline/token_optimizer.pyai_council/query_pipeline/topic_classifier.pyai_council/query_pipeline/vector_store.pyai_council/sanitization/__init__.pyai_council/sanitization/base.pyai_council/sanitization/config_loader.pyai_council/sanitization/keyword_filter.pyai_council/sanitization/rate_limiter.pyai_council/sanitization/regex_filter.pyai_council/sanitization/sanitization_filter.pyconfig/query_pipeline.yamlconfig/sanitization_filters.yamlexamples/query_pipeline_demo.pyexamples/sanitization_pipeline.pytests/test_query_pipeline.pytests/test_sanitization.pytmp/validate_query_pipeline.py
Pull Request
Description
This PR introduces a Cost-Optimized Query Pipeline that enhances the AI Council system with intelligent query classification, decomposition, model routing, and token optimization.
The pipeline uses embeddings + vector DB for topic classification, decomposes queries into sub-tasks, assigns cost-efficient models dynamically, and optimizes token usage to reduce overall inference cost while maintaining output quality.
Type of Change
Related Issues
Fixes #187
Changes Made
New Package:
ai_council/query_pipeline/embeddings.pyvector_store.pytopic_classifier.pyquery_decomposer.pymodel_router.pytoken_optimizer.pycache.pypipeline.pyPipelineResultandCostReportValidation
Misclassification issue
Query decomposition issue (comma-separated inputs)
_split_by_comma_list()min_fragment_lenfrom 4 → 2RAG inefficiency (irrelevant chunks not removed)
Testing
Testing Details