Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 47 additions & 4 deletions docs/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -3691,15 +3691,19 @@
"description": "URL of the referenced document"
},
"doc_title": {
"type": "string",
"anyOf": [
{
"type": "string"
},
{
"type": "null"
}
],
"title": "Doc Title",
"description": "Title of the referenced document"
}
},
"type": "object",
"required": [
"doc_title"
],
"title": "ReferencedDocument",
"description": "Model representing a document referenced in generating a response.\n\nAttributes:\n doc_url: Url to the referenced doc.\n doc_title: Title of the referenced doc."
},
Expand Down Expand Up @@ -3969,6 +3973,45 @@
"title": "ToolsResponse",
"description": "Model representing a response to tools request."
},
"UnauthorizedResponse": {
"properties": {
"tools": {
"items": {
"additionalProperties": true,
"type": "object"
},
"type": "array",
"title": "Tools",
"description": "List of tools available from all configured MCP servers and built-in toolgroups",
"examples": [
[
{
"description": "Read contents of a file from the filesystem",
"identifier": "filesystem_read",
"parameters": [
{
"description": "Path to the file to read",
"name": "path",
"parameter_type": "string",
"required": true
}
],
"provider_id": "model-context-protocol",
"server_source": "http://localhost:3000",
"toolgroup_id": "filesystem-tools",
"type": "tool"
}
]
]
}
},
"type": "object",
"required": [
"tools"
],
"title": "ToolsResponse",
"description": "Model representing a response to tools request."
},
"UnauthorizedResponse": {
"properties": {
"detail": {
Expand Down
20 changes: 16 additions & 4 deletions src/app/endpoints/conversations_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,13 +314,25 @@ def check_conversation_existence(user_id: str, conversation_id: str) -> None:

def transform_chat_message(entry: CacheEntry) -> dict[str, Any]:
"""Transform the message read from cache into format used by response payload."""
user_message = {
"content": entry.query,
"type": "user"
}
assistant_message: dict[str, Any] = {
"content": entry.response,
"type": "assistant"
}

# If referenced_documents exist on the entry, add them to the assistant message
if entry.referenced_documents is not None:
assistant_message["referenced_documents"] = [
doc.model_dump(mode='json') for doc in entry.referenced_documents
]

return {
"provider": entry.provider,
"model": entry.model,
"messages": [
{"content": entry.query, "type": "user"},
{"content": entry.response, "type": "assistant"},
],
"messages": [user_message, assistant_message],
"started_at": entry.started_at,
"completed_at": entry.completed_at,
}
20 changes: 14 additions & 6 deletions src/app/endpoints/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from llama_stack_client.types.model_list_response import ModelListResponse
from llama_stack_client.types.shared.interleaved_content_item import TextContentItem
from llama_stack_client.types.tool_execution_step import ToolExecutionStep
from pydantic import AnyUrl

import constants
import metrics
Expand All @@ -31,6 +32,7 @@
from authorization.middleware import authorize
from client import AsyncLlamaStackClientHolder
from configuration import configuration
from models.cache_entry import CacheEntry
from models.config import Action
from models.database.conversations import UserConversation
from models.requests import Attachment, QueryRequest
Expand Down Expand Up @@ -331,16 +333,22 @@ async def query_endpoint_handler( # pylint: disable=R0914
)

completed_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")

cache_entry = CacheEntry(
query=query_request.query,
response=summary.llm_response,
provider=provider_id,
model=model_id,
started_at=started_at,
completed_at=completed_at,
referenced_documents=referenced_documents if referenced_documents else None
)

store_conversation_into_cache(
configuration,
user_id,
conversation_id,
provider_id,
model_id,
query_request.query,
summary.llm_response,
started_at,
completed_at,
cache_entry,
_skip_userid_check,
topic_summary,
)
Expand Down
25 changes: 18 additions & 7 deletions src/app/endpoints/streaming_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from llama_stack_client.types.shared import ToolCall
from llama_stack_client.types.shared.interleaved_content_item import TextContentItem
from pydantic import AnyUrl

from app.database import get_session
from app.endpoints.query import (
Expand All @@ -43,12 +44,14 @@
from constants import DEFAULT_RAG_TOOL, MEDIA_TYPE_JSON, MEDIA_TYPE_TEXT
import metrics
from metrics.utils import update_llm_token_count_from_turn
from models.cache_entry import CacheEntry
from models.config import Action
from models.database.conversations import UserConversation
from models.requests import QueryRequest
from models.responses import ForbiddenResponse, UnauthorizedResponse
from models.responses import ForbiddenResponse, UnauthorizedResponse, ReferencedDocument
from utils.endpoints import (
check_configuration_loaded,
create_referenced_documents_with_metadata,
create_rag_chunks_dict,
get_agent,
get_system_prompt,
Expand Down Expand Up @@ -863,16 +866,24 @@ async def response_generator(
)

completed_at = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%SZ")

referenced_documents = create_referenced_documents_with_metadata(summary, metadata_map)

cache_entry = CacheEntry(
query=query_request.query,
response=summary.llm_response,
provider=provider_id,
model=model_id,
started_at=started_at,
completed_at=completed_at,
referenced_documents=referenced_documents if referenced_documents else None
)

store_conversation_into_cache(
configuration,
user_id,
conversation_id,
provider_id,
model_id,
query_request.query,
summary.llm_response,
started_at,
completed_at,
cache_entry,
_skip_userid_check,
topic_summary,
)
Expand Down
3 changes: 2 additions & 1 deletion src/cache/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from abc import ABC, abstractmethod

from models.cache_entry import CacheEntry, ConversationData
from models.cache_entry import CacheEntry
from models.responses import ConversationData
from utils.suid import check_suid


Expand Down
3 changes: 2 additions & 1 deletion src/cache/in_memory_cache.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""In-memory cache implementation."""

from cache.cache import Cache
from models.cache_entry import CacheEntry, ConversationData
from models.cache_entry import CacheEntry
from models.config import InMemoryCacheConfig
from models.responses import ConversationData
from log import get_logger
from utils.connection_decorator import connection

Expand Down
3 changes: 2 additions & 1 deletion src/cache/noop_cache.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""No-operation cache implementation."""

from cache.cache import Cache
from models.cache_entry import CacheEntry, ConversationData
from models.cache_entry import CacheEntry
from models.responses import ConversationData
from log import get_logger
from utils.connection_decorator import connection

Expand Down
64 changes: 40 additions & 24 deletions src/cache/postgres_cache.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""PostgreSQL cache implementation."""

import json
import psycopg2

from cache.cache import Cache
from cache.cache_error import CacheError
from models.cache_entry import CacheEntry, ConversationData
from models.cache_entry import CacheEntry
from models.config import PostgreSQLDatabaseConfiguration
from models.responses import ConversationData, ReferencedDocument
from log import get_logger
from utils.connection_decorator import connection

Expand All @@ -18,17 +20,18 @@ class PostgresCache(Cache):
The cache itself lives stored in following table:

```
Column | Type | Nullable |
-----------------+--------------------------------+----------+
user_id | text | not null |
conversation_id | text | not null |
created_at | timestamp without time zone | not null |
started_at | text | |
completed_at | text | |
query | text | |
response | text | |
provider | text | |
model | text | |
Column | Type | Nullable |
-----------------------+--------------------------------+----------+
user_id | text | not null |
conversation_id | text | not null |
created_at | timestamp without time zone | not null |
started_at | text | |
completed_at | text | |
query | text | |
response | text | |
provider | text | |
model | text | |
referenced_documents | jsonb | |
Indexes:
"cache_pkey" PRIMARY KEY, btree (user_id, conversation_id, created_at)
"timestamps" btree (created_at)
Expand All @@ -37,15 +40,16 @@ class PostgresCache(Cache):

CREATE_CACHE_TABLE = """
CREATE TABLE IF NOT EXISTS cache (
user_id text NOT NULL,
conversation_id text NOT NULL,
created_at timestamp NOT NULL,
started_at text,
completed_at text,
query text,
response text,
provider text,
model text,
user_id text NOT NULL,
conversation_id text NOT NULL,
created_at timestamp NOT NULL,
started_at text,
completed_at text,
query text,
response text,
provider text,
model text,
referenced_documents jsonb,
PRIMARY KEY(user_id, conversation_id, created_at)
);
"""
Comment on lines 41 to 55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add migration for the new referenced_documents column.

CREATE TABLE IF NOT EXISTS won't add the new column to existing PostgreSQL databases. On upgraded installations, INSERT/SELECT operations will fail with "column does not exist" errors.

Add migration logic in initialize_cache():

     def initialize_cache(self) -> None:
         """Initialize cache - clean it up etc."""
         if self.connection is None:
             logger.error("Cache is disconnected")
             raise CacheError("Initialize_cache: cache is disconnected")

         # cursor as context manager is not used there on purpose
         # any CREATE statement can raise it's own exception
         # and it should not interfere with other statements
         cursor = self.connection.cursor()

         logger.info("Initializing table for cache")
         cursor.execute(PostgresCache.CREATE_CACHE_TABLE)
+
+        # Ensure referenced_documents column exists on upgrades
+        try:
+            cursor.execute("""
+                SELECT column_name 
+                FROM information_schema.columns 
+                WHERE table_name='cache' AND column_name='referenced_documents'
+            """)
+            if not cursor.fetchone():
+                logger.info("Adding missing 'referenced_documents' column to cache")
+                cursor.execute(
+                    "ALTER TABLE cache ADD COLUMN referenced_documents jsonb"
+                )
+        except Exception as e:
+            logger.error("Failed to ensure referenced_documents column: %s", e)
+            raise

         logger.info("Initializing table for conversations")
         cursor.execute(PostgresCache.CREATE_CONVERSATIONS_TABLE)

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/cache/postgres_cache.py around lines 41 to 55, the CREATE TABLE SQL adds
referenced_documents but existing DBs won't get the new column; update
initialize_cache() to run a migration that alters the existing table to add the
referenced_documents jsonb column if it does not exist before any INSERT/SELECT.
Implement an idempotent ALTER TABLE ... ADD COLUMN IF NOT EXISTS
referenced_documents jsonb (or equivalent conditional check), run it inside the
same connection/transaction used for initialization, handle/log any errors, and
keep the CREATE TABLE statement for fresh installs.

Expand All @@ -66,16 +70,16 @@ class PostgresCache(Cache):
"""

SELECT_CONVERSATION_HISTORY_STATEMENT = """
SELECT query, response, provider, model, started_at, completed_at
SELECT query, response, provider, model, started_at, completed_at, referenced_documents
FROM cache
WHERE user_id=%s AND conversation_id=%s
ORDER BY created_at
"""

INSERT_CONVERSATION_HISTORY_STATEMENT = """
INSERT INTO cache(user_id, conversation_id, created_at, started_at, completed_at,
query, response, provider, model)
VALUES (%s, %s, CURRENT_TIMESTAMP, %s, %s, %s, %s, %s, %s)
query, response, provider, model, referenced_documents)
VALUES (%s, %s, CURRENT_TIMESTAMP, %s, %s, %s, %s, %s, %s, %s)
"""

QUERY_CACHE_SIZE = """
Expand Down Expand Up @@ -211,13 +215,19 @@ def get(

result = []
for conversation_entry in conversation_entries:
# Parse it back into an LLMResponse object
docs_data = conversation_entry[6]
docs_obj = None
if docs_data:
docs_obj = [ReferencedDocument.model_validate(doc) for doc in docs_data]
cache_entry = CacheEntry(
query=conversation_entry[0],
response=conversation_entry[1],
provider=conversation_entry[2],
model=conversation_entry[3],
started_at=conversation_entry[4],
completed_at=conversation_entry[5],
referenced_documents=docs_obj,
)
result.append(cache_entry)

Expand Down Expand Up @@ -245,6 +255,11 @@ def insert_or_append(
raise CacheError("insert_or_append: cache is disconnected")

try:
referenced_documents_json = None
if cache_entry.referenced_documents:
docs_as_dicts = [doc.model_dump(mode='json') for doc in cache_entry.referenced_documents]
referenced_documents_json = json.dumps(docs_as_dicts)

# the whole operation is run in one transaction
with self.connection.cursor() as cursor:
cursor.execute(
Expand All @@ -258,6 +273,7 @@ def insert_or_append(
cache_entry.response,
cache_entry.provider,
cache_entry.model,
referenced_documents_json,
),
)

Expand Down
Loading
Loading