diff --git a/backend/app/core/langfuse/langfuse.py b/backend/app/core/langfuse/langfuse.py index 8fd0aa8b..a7fdbd97 100644 --- a/backend/app/core/langfuse/langfuse.py +++ b/backend/app/core/langfuse/langfuse.py @@ -1,10 +1,12 @@ import uuid import logging -from typing import Any, Dict, Optional +from typing import Any, Callable, Dict, Optional +from functools import wraps from asgi_correlation_id import correlation_id from langfuse import Langfuse from langfuse.client import StatefulGenerationClient, StatefulTraceClient +from app.models.llm import CompletionConfig, QueryParams, LLMCallResponse logger = logging.getLogger(__name__) @@ -107,3 +109,112 @@ def log_error(self, error_message: str, response_id: Optional[str] = None): def flush(self): self.langfuse.flush() + + +def observe_llm_execution( + session_id: str | None = None, + credentials: dict | None = None, +): + """Decorator to add Langfuse observability to LLM provider execute methods. + + Args: + credentials: Langfuse credentials with public_key, secret_key, and host + session_id: Session ID for grouping traces (conversation_id) + + Usage: + decorated_execute = observe_llm_execution( + credentials=langfuse_creds, + session_id=conversation_id + )(provider_instance.execute) + """ + + def decorator(func: Callable) -> Callable: + @wraps(func) + def wrapper(completion_config: CompletionConfig, query: QueryParams, **kwargs): + # Skip observability if no credentials provided + if not credentials or not all( + key in credentials for key in ["public_key", "secret_key", "host"] + ): + logger.info("[Langfuse] No credentials - skipping observability") + return func(completion_config, query, **kwargs) + + try: + langfuse = Langfuse( + public_key=credentials.get("public_key"), + secret_key=credentials.get("secret_key"), + host=credentials.get("host"), + ) + except Exception as e: + logger.warning(f"[Langfuse] Failed to initialize client: {e}") + return func(completion_config, query, **kwargs) + + trace_metadata = { + "provider": completion_config.provider, + } + + if query.conversation and query.conversation.id: + trace_metadata["conversation_id"] = query.conversation.id + + trace = langfuse.trace( + name="unified-llm-call", + input=query.input, + metadata=trace_metadata, + tags=[completion_config.provider], + ) + + generation = trace.generation( + name=f"{completion_config.provider}-completion", + input=query.input, + model=completion_config.params.get("model"), + ) + + try: + # Execute the actual LLM call + response: LLMCallResponse | None + error: str | None + response, error = func(completion_config, query, **kwargs) + + if response: + generation.end( + output={ + "status": "success", + "output": response.response.output.text, + }, + usage_details={ + "input": response.usage.input_tokens, + "output": response.usage.output_tokens, + }, + model=response.response.model, + ) + + trace.update( + output={ + "status": "success", + "output": response.response.output.text, + }, + session_id=session_id or response.response.conversation_id, + ) + else: + error_msg = error or "Unknown error" + generation.end(output={"error": error_msg}) + trace.update( + output={"status": "failure", "error": error_msg}, + session_id=session_id, + ) + + langfuse.flush() + return response, error + + except Exception as e: + error_msg = str(e) + generation.end(output={"error": error_msg}) + trace.update( + output={"status": "failure", "error": error_msg}, + session_id=session_id, + ) + langfuse.flush() + raise + + return wrapper + + return decorator diff --git a/backend/app/services/llm/jobs.py b/backend/app/services/llm/jobs.py index a8ad9d83..831f3cca 100644 --- a/backend/app/services/llm/jobs.py +++ b/backend/app/services/llm/jobs.py @@ -7,11 +7,13 @@ from app.core.db import engine from app.crud.config import ConfigVersionCrud +from app.crud.credentials import get_provider_credential from app.crud.jobs import JobCrud from app.models import JobStatus, JobType, JobUpdate, LLMCallRequest from app.models.llm.request import ConfigBlob, LLMCallConfig from app.utils import APIResponse, send_callback from app.celery.utils import start_high_priority_job +from app.core.langfuse.langfuse import observe_llm_execution from app.services.llm.providers.registry import get_llm_provider @@ -182,7 +184,25 @@ def execute_job( ) return handle_job_error(job_id, request.callback_url, callback_response) - response, error = provider_instance.execute( + langfuse_credentials = get_provider_credential( + session=session, + org_id=organization_id, + project_id=project_id, + provider="langfuse", + ) + + # Extract conversation_id for langfuse session grouping + conversation_id = None + if request.query.conversation and request.query.conversation.id: + conversation_id = request.query.conversation.id + + # Apply Langfuse observability decorator to provider execute method + decorated_execute = observe_llm_execution( + credentials=langfuse_credentials, + session_id=conversation_id, + )(provider_instance.execute) + + response, error = decorated_execute( completion_config=config_blob.completion, query=request.query, include_provider_raw_response=request.include_provider_raw_response,