Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions autobot-backend/api/knowledge_rag_feedback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright (c) mrveiss. All rights reserved.
# AutoBot - AI-Powered Automation Platform
# Author: mrveiss
"""
Knowledge Base RAG Annotation Feedback API (Issue #3240).

Receives explicit accept/reject annotation signals from the frontend
KnowledgeResearchPanel and records them as user-scoped retrieval feedback
events in the per-user Redis stream.

Endpoint:
- POST /rag-feedback — record an accept/reject annotation for a source card
"""

import json
import logging
import time
from datetime import datetime, timezone
from typing import Literal, Optional

from fastapi import APIRouter, Depends
from pydantic import BaseModel

from auth_middleware import get_current_user
from autobot_shared.error_boundaries import ErrorCategory, with_error_handling
from autobot_shared.redis_client import get_redis_client
from constants.ttl_constants import TTL_30_DAYS
from knowledge.search_components.retrieval_learner import GLOBAL_USER

logger = logging.getLogger(__name__)

router = APIRouter(tags=["knowledge-rag-feedback"])

# Redis stream TTL mirrors _store_feedback_in_stream (Issue #2102).
_STREAM_TTL_SECONDS = TTL_30_DAYS


class RagFeedbackRequest(BaseModel):
"""Annotation feedback submitted from the source card accept/reject UI."""

source_url: str
title: str = ""
query: str
decision: Literal["accepted", "rejected"]
user_id: Optional[str] = None


@router.post("/rag-feedback")
@with_error_handling(
category=ErrorCategory.SERVER_ERROR,
operation="record_rag_feedback",
error_code_prefix="KNOWLEDGE",
)
async def record_rag_feedback(
body: RagFeedbackRequest,
current_user: dict = Depends(get_current_user),
) -> dict:
"""Record a user's explicit accept/reject annotation for a retrieved source.

Issue #3240: Writes to ``rag:feedback:{user_id}:{date}`` Redis stream so
RetrievalLearner can consume the signal and update per-user retrieval
patterns. The authenticated user's ID from the JWT is used; body.user_id
is accepted as a fallback for unauthenticated or service-level callers.

Returns:
{"status": "recorded", "stream_key": "..."} on success.
"""
uid = (
current_user.get("user_id")
or current_user.get("id")
or body.user_id
or GLOBAL_USER
)
date_key = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d")
stream_key = f"rag:feedback:{uid}:{date_key}"

# Encode the annotation as a pseudo-retrieval feedback event.
# decision="accepted" → full positive trajectory; "rejected" → empty ranked list.
is_accepted = body.decision == "accepted"
entry = {
"query_text": body.query,
"retrieved_chunk_ids": json.dumps([body.source_url], ensure_ascii=False),
"final_ranked_ids": json.dumps(
[body.source_url] if is_accepted else [], ensure_ascii=False
),
"complexity": "simple",
"annotation": body.decision,
"title": body.title,
"timestamp": str(time.time()),
}

try:
redis = await get_redis_client(async_client=True, database="analytics")
if redis is None:
logger.warning(
"record_rag_feedback: Redis unavailable; annotation dropped for user %s",
uid,
)
return {"status": "skipped", "reason": "redis_unavailable"}

await redis.xadd(stream_key, entry)
await redis.expire(stream_key, _STREAM_TTL_SECONDS)
logger.info(
"record_rag_feedback: %s annotation written to %s",
body.decision,
stream_key,
)
except Exception as exc:
logger.warning("record_rag_feedback: stream write failed: %s", exc)
return {"status": "error", "reason": str(exc)}

return {"status": "recorded", "stream_key": stream_key, "decision": body.decision}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
from api.knowledge_suggestions import router as knowledge_suggestions_router
from api.knowledge_tags import router as knowledge_tags_router
from api.knowledge_verification import router as knowledge_verification_router
from api.knowledge_rag_feedback import router as knowledge_rag_feedback_router
from api.llm import router as llm_router
from api.llm_providers import router as llm_providers_router
from api.mcp_registry import router as mcp_registry_router
Expand Down Expand Up @@ -115,6 +116,12 @@ def _get_core_knowledge_routers() -> list:
["knowledge-verification"],
"knowledge_verification",
),
(
knowledge_rag_feedback_router,
"/knowledge_base",
["knowledge-rag-feedback"],
"knowledge_rag_feedback",
),
]


Expand Down
110 changes: 80 additions & 30 deletions autobot-backend/knowledge/search_components/retrieval_learner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@
"""
Retrieval Pattern Learner — closes the RAG feedback loop. Issue #2095.

Consumes rag:feedback:{date} Redis streams, scores retrieval trajectories by
user acceptance (rerank-position gain), distils successful patterns into
reusable Redis hashes, and exposes a query-time hint API so RAGService can
adapt its strategy based on historical evidence.
Consumes rag:feedback:{user_id}:{date} Redis streams (Issue #3240), scores
retrieval trajectories by user acceptance (rerank-position gain), distils
successful patterns into reusable Redis hashes namespaced per user, and
exposes a query-time hint API so RAGService can adapt its strategy based on
historical evidence. When no user-scoped pattern exists, lookup falls back
to the global (user_id="__global__") namespace.

Redis key layout
----------------
rag:retrieval_patterns:{pattern_hash} HASH — per-pattern metrics & hints
rag:rl:cursor:{date} STRING — last processed entry ID per day
rag:retrieval_patterns:{user_id}:{pattern_hash} HASH — per-user pattern metrics & hints
rag:retrieval_patterns:__global__:{pattern_hash} HASH — global fallback patterns
rag:rl:cursors HASH — last processed stream ID per key
"""

import hashlib
Expand Down Expand Up @@ -44,8 +47,10 @@
_XRANGE_BATCH = 100
# Redis cursor hash key (one entry per date key).
_CURSOR_HASH_KEY = "rag:rl:cursors"
# Hash prefix for distilled patterns.
# Hash prefix for distilled patterns (Issue #3240: namespaced by user_id).
_PATTERN_KEY_PREFIX = "rag:retrieval_patterns:"
# Sentinel used when no authenticated user is available (global/system scope).
GLOBAL_USER = "__global__"


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -181,22 +186,33 @@ async def _save_cursor(self, stream_key: str, cursor: str) -> None:
# Stream consumption
# ------------------------------------------------------------------

async def consume_feedback_stream(self, date_key: Optional[str] = None) -> int:
"""Consume new events from rag:feedback:{date_key} and distil patterns.
async def consume_feedback_stream(
self,
date_key: Optional[str] = None,
user_id: Optional[str] = None,
) -> int:
"""Consume new events from rag:feedback:{user_id}:{date_key} and distil patterns.

Issue #3240: feedback streams are now namespaced by user_id so per-user
retrieval patterns are learned independently. When user_id is None the
global sentinel ``__global__`` is used, which preserves backward
compatibility with system-level schedulers.

Uses a per-stream cursor so repeated scheduler calls only read NEW
entries. Mirrors EdgeLearner.consume_feedback_stream() design.

Args:
date_key: UTC date string YYYY-MM-DD. Defaults to today.
user_id: Authenticated user identifier. Defaults to global scope.

Returns:
Number of new events processed.
"""
if date_key is None:
date_key = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d")

stream_key = f"rag:feedback:{date_key}"
uid = user_id or GLOBAL_USER
stream_key = f"rag:feedback:{uid}:{date_key}"
await self._load_cursors()

resume_id = self._cursors.get(stream_key, "0-0")
Expand All @@ -222,7 +238,7 @@ async def consume_feedback_stream(self, date_key: Optional[str] = None) -> int:
break

for entry_id, fields in entries:
await self._process_feedback_event(fields)
await self._process_feedback_event(fields, user_id=uid)
ts, seq = (
entry_id.decode() if isinstance(entry_id, bytes) else entry_id
).split("-")
Expand All @@ -249,8 +265,13 @@ async def consume_feedback_stream(self, date_key: Optional[str] = None) -> int:
# Event processing & pattern distillation
# ------------------------------------------------------------------

async def _process_feedback_event(self, fields: Dict) -> None:
"""Score a single feedback event and distil a pattern if successful."""
async def _process_feedback_event(
self, fields: Dict, user_id: str = GLOBAL_USER
) -> None:
"""Score a single feedback event and distil a pattern if successful.

Issue #3240: user_id scopes the distilled pattern to the originating user.
"""

def _decode(v):
return v.decode("utf-8") if isinstance(v, bytes) else v
Expand All @@ -275,7 +296,7 @@ def _decode(v):
categories = _extract_categories(ranked_ids)
strategy_hints = _build_strategy_hints(complexity, len(ranked_ids))

await self._distil_pattern(complexity, categories, strategy_hints)
await self._distil_pattern(complexity, categories, strategy_hints, user_id=user_id)

@staticmethod
def _score_trajectory(retrieved_ids: List[str], ranked_ids: List[str]) -> bool:
Expand Down Expand Up @@ -307,14 +328,17 @@ async def _distil_pattern(
query_type: str,
categories: List[str],
strategy_hints: Dict[str, str],
user_id: str = GLOBAL_USER,
) -> None:
"""Upsert a retrieval pattern in Redis.

The pattern hash is derived from (query_type, sorted categories) so
that semantically identical patterns always map to the same key.
Issue #3240: redis_key is namespaced by user_id so each user's patterns
are stored independently. The pattern hash is derived from
(query_type, sorted categories) so semantically identical patterns from
the same user map to the same key.
"""
pattern_hash = _compute_pattern_hash(query_type, categories)
redis_key = f"{_PATTERN_KEY_PREFIX}{pattern_hash}"
redis_key = f"{_PATTERN_KEY_PREFIX}{user_id}:{pattern_hash}"

try:
redis = await self._get_redis()
Expand Down Expand Up @@ -354,12 +378,15 @@ async def get_matching_pattern(
query: str,
complexity: str = "simple",
categories: Optional[List[str]] = None,
user_id: Optional[str] = None,
) -> Optional[RetrievalPattern]:
"""Return the best matching historical pattern for a query, or None.

Matching strategy (fast, no embedding required):
1. Exact hash match on (complexity, sorted categories).
2. Complexity-only match (ignore categories) when no exact match.
Issue #3240: Matching is attempted in order:
1. User-scoped exact hash on (complexity, sorted categories).
2. User-scoped complexity-only hash (ignore categories).
3. Global exact hash — fallback when the user has no patterns yet.
4. Global complexity-only hash — final fallback.

Only returns patterns with success_rate >= 0.6 and usage_count >= 3
to avoid acting on sparse evidence.
Expand All @@ -369,55 +396,78 @@ async def get_matching_pattern(
reserved for future embedding-based matching).
complexity: QueryComplexity.value string.
categories: Optional category list from the calling context.
user_id: Authenticated user identifier for per-user scope.
Falls back to global patterns when None or when the
user has no qualifying patterns.

Returns:
Best matching RetrievalPattern or None.
"""
_ = query # reserved for future embedding-based lookup
cats = sorted(categories) if categories else []
uid = user_id or GLOBAL_USER

candidates = [
_compute_pattern_hash(complexity, cats),
_compute_pattern_hash(complexity, []), # complexity-only fallback
exact_hash = _compute_pattern_hash(complexity, cats)
complexity_hash = _compute_pattern_hash(complexity, [])

# Build candidate list: user-scoped first, then global fallback.
candidates: List[str] = [
f"{_PATTERN_KEY_PREFIX}{uid}:{exact_hash}",
f"{_PATTERN_KEY_PREFIX}{uid}:{complexity_hash}",
]
if uid != GLOBAL_USER:
# Append global fallback candidates (Issue #3240).
candidates.append(f"{_PATTERN_KEY_PREFIX}{GLOBAL_USER}:{exact_hash}")
candidates.append(f"{_PATTERN_KEY_PREFIX}{GLOBAL_USER}:{complexity_hash}")

try:
redis = await self._get_redis()
for ph in candidates:
redis_key = f"{_PATTERN_KEY_PREFIX}{ph}"
for redis_key in candidates:
raw = await redis.hgetall(redis_key)
if not raw:
continue
pattern = RetrievalPattern.from_redis_mapping(raw)
if pattern.success_rate >= 0.6 and pattern.usage_count >= 3:
logger.debug(
"RetrievalLearner: matched pattern %s (rate=%.2f, usage=%d)",
ph,
"RetrievalLearner: matched pattern %s (rate=%.2f, usage=%d, key=%s)",
pattern.pattern_hash,
pattern.success_rate,
pattern.usage_count,
redis_key,
)
return pattern
except Exception as exc:
logger.warning("RetrievalLearner: get_matching_pattern failed: %s", exc)

return None

async def record_pattern_outcome(self, pattern_hash: str, success: bool) -> None:
async def record_pattern_outcome(
self,
pattern_hash: str,
success: bool,
user_id: Optional[str] = None,
) -> None:
"""Update the success_rate of an existing pattern with a new outcome.

Issue #3240: user_id must match the scope used when the pattern was
matched so the correct namespaced Redis key is updated. Falls back to
the global scope when user_id is None.

Uses EMA (alpha=0.1) so that a single bad outcome does not discard an
otherwise reliable pattern.

Args:
pattern_hash: Hash key returned by get_matching_pattern().
success: True if the retrieval led to a satisfactory response.
user_id: User scope; use None for global/system scope.
"""
redis_key = f"{_PATTERN_KEY_PREFIX}{pattern_hash}"
uid = user_id or GLOBAL_USER
redis_key = f"{_PATTERN_KEY_PREFIX}{uid}:{pattern_hash}"
try:
redis = await self._get_redis()
raw = await redis.hgetall(redis_key)
if not raw:
logger.debug("RetrievalLearner: no pattern found for %s", pattern_hash)
logger.debug("RetrievalLearner: no pattern found for %s", redis_key)
return
pattern = RetrievalPattern.from_redis_mapping(raw)
signal = 1.0 if success else 0.0
Expand Down
Loading
Loading