Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions kedro-agentic-workflows/conf/base/catalog.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,26 @@ response_user_prompt_txt:
filepath: data/response_generation/prompts/response_user.txt

intent_prompt_langfuse:
type: kedro_agentic_workflows.datasets.langfuse_prompt_dataset.PromptDataset
type: kedro_agentic_workflows.datasets.langfuse_prompt_dataset.LangfusePromptDataset
filepath: data/intent_detection/prompts/intent_prompt_langfuse.json
prompt_name: "intent-classifier"
prompt_type: "chat"
credentials: langfuse_credentials
sync_policy: local # or strict or remote
mode: langchain # or sdk

intent_prompt_opik:
type: kedro_agentic_workflows.datasets.opik_prompt_dataset.PromptDataset
filepath: data/intent_detection/prompts/intent_prompt_opik.json
prompt_name: "intent-classifier"
prompt_type: "chat"
credentials: opik_credentials
# --- Langfuse Tracing ---
intent_tracer_langfuse:
type: kedro_agentic_workflows.datasets.langfuse_trace_dataset.LangfuseTraceDataset
credentials: langfuse_credentials
mode: langchain # or openai or sdk

# intent_prompt_opik:
# type: kedro_agentic_workflows.datasets.opik_prompt_dataset.PromptDataset
# filepath: data/intent_detection/prompts/intent_prompt_opik.json
# prompt_name: "intent-classifier"
# prompt_type: "chat"
# credentials: opik_credentials


# --- Default dataset ---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,19 @@ def _get_content(data: str | list) -> str:
return "\n".join(msg["content"] for msg in data)


class PromptDataset(AbstractDataset):
class LangfusePromptDataset(AbstractDataset):
"""
Kedro dataset for managing LangChain prompts with Langfuse versioning.
Kedro dataset for managing prompts with Langfuse versioning.

Behavior:
- On save: writes prompt JSON to disk and creates/updates it in Langfuse.
- On load: prefers local file, syncs with Langfuse, ensures reproducibility.
- Returns LangChain `ChatPromptTemplate` when loaded.
- On load: syncs based on sync_policy, ensures reproducibility.
- Returns LangChain `ChatPromptTemplate` (langchain mode) or raw Langfuse object (sdk mode).

Sync policies:
- local: local file takes precedence (default)
- remote: Langfuse version takes precedence (errors if remote doesn't exist)
- strict: error if local and remote differ
"""

def __init__(
Expand All @@ -40,13 +45,17 @@ def __init__(
prompt_name: str,
credentials: dict[str, Any],
prompt_type: Literal["chat", "text"] = "text",
sync_policy: Literal["local", "remote", "strict"] = "local",
mode: Literal["langchain", "sdk"] = "langchain",
):
"""
Args:
filepath: Local JSON file path for storing prompt.
prompt_name: Unique identifier for the prompt in Langfuse.
prompt_type: Either "chat" or "text".
credentials: Dict with Langfuse credentials {public_key, secret_key, host}.
sync_policy: How to handle conflicts - "local", "remote", or "strict".
mode: Return type - "langchain" for ChatPromptTemplate or "sdk" for raw object.
"""
self._filepath = Path(filepath)
self._prompt_name = prompt_name
Expand All @@ -56,6 +65,8 @@ def __init__(
secret_key=credentials["secret_key"],
host=credentials["host"],
)
self._sync_policy = sync_policy
self._mode = mode

def _describe(self):
return {"filepath": self._filepath, "prompt_name": self._prompt_name}
Expand All @@ -79,14 +90,43 @@ def _sync_with_langfuse(
self, local_data: str | None, langfuse_prompt: Any | None
) -> Any:
"""
Ensure local file and Langfuse prompt are consistent.
Ensure local file and Langfuse prompt are consistent based on sync_policy.
Returns latest Langfuse prompt after synchronization.

Cases handled:
- Local exists but not in Langfuse → push local to Langfuse.
- Both exist but differ → update Langfuse with local.
- Only Langfuse exists → write to local.
"""
# Handle strict policy
# Error if:
# - Either local_data or langfuse_prompt does not exist
# - Both local_data and langfuse_prompt exist but differ
if self._sync_policy == "strict":
if not local_data or not langfuse_prompt:
raise ValueError(
f"Strict sync policy specified for '{self._prompt_name}' "
f"but no local_data or remote prompt exists in Langfuse."
)

local_hash = _hash(_get_content(local_data))
remote_hash = _hash(_get_content(langfuse_prompt.prompt))
if local_hash != remote_hash:
raise ValueError(
f"Strict sync failed for '{self._prompt_name}': "
f"local and remote prompts differ. Use 'local' or 'remote' policy to resolve."
)
return langfuse_prompt

# Handle remote-first policy
if self._sync_policy == "remote":
if not langfuse_prompt:
raise ValueError(
f"Remote sync policy specified for '{self._prompt_name}' "
f"but no remote prompt exists in Langfuse. Create the prompt in Langfuse first."
)
if not local_data or _hash(_get_content(local_data)) != _hash(_get_content(langfuse_prompt.prompt)):
self._filepath.parent.mkdir(parents=True, exist_ok=True)
with open(self._filepath, "w", encoding="utf-8") as f:
json.dump(langfuse_prompt.prompt, f, indent=2)
return langfuse_prompt

# Default local-first policy
if local_data is not None:
if langfuse_prompt is None:
# Push local to Langfuse
Expand All @@ -95,7 +135,7 @@ def _sync_with_langfuse(
self._prompt_name, type=self._prompt_type, label="latest"
)

# If mismatch → update Langfuse
# If mismatch → update Langfuse with local
if _hash(_get_content(local_data)) != _hash(
_get_content(langfuse_prompt.prompt)
):
Expand All @@ -116,7 +156,7 @@ def _sync_with_langfuse(
f"No prompt found locally or in Langfuse for '{self._prompt_name}'"
)

def load(self) -> ChatPromptTemplate:
def load(self) -> ChatPromptTemplate | Any:
"""
Load prompt with synchronization logic.
Returns LangChain `ChatPromptTemplate`.
Expand All @@ -135,4 +175,7 @@ def load(self) -> ChatPromptTemplate:

langfuse_prompt = self._sync_with_langfuse(local_data, langfuse_prompt)

return ChatPromptTemplate.from_messages(langfuse_prompt.get_langchain_prompt())
if self._mode == "sdk":
return langfuse_prompt
else:
return ChatPromptTemplate.from_messages(langfuse_prompt.get_langchain_prompt())
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from typing import Any, Literal

from kedro.io import AbstractDataset
from langfuse import Langfuse


class LangfuseTraceDataset(AbstractDataset):
"""
Kedro dataset for managing Langfuse tracing clients and callbacks.
Returns appropriate tracing objects based on mode configuration.
Modes:
- langchain: Returns CallbackHandler for LangChain integration
- openai: Returns wrapped OpenAI client with automatic tracing
- sdk: Returns raw Langfuse client for manual tracing
"""

def __init__(
self,
credentials: dict[str, Any],
mode: Literal["langchain", "openai", "sdk"] = "sdk",
**trace_kwargs
):
"""
Args:
credentials: Dict with Langfuse credentials {public_key, secret_key, host}.
mode: Tracing mode - "langchain", "openai", or "sdk".
**trace_kwargs: Additional kwargs passed to the tracing client.
"""
self._credentials = credentials
self._mode = mode
self._trace_kwargs = trace_kwargs

def _describe(self):
return {"mode": self._mode, "credentials": "***"}

def load(self):
"""
Load appropriate tracing client based on mode.
Returns:
- langchain mode: CallbackHandler for LangChain callbacks
- openai mode: Wrapped OpenAI client with tracing
- sdk mode: Raw Langfuse client
"""
if self._mode == "langchain":
from langfuse.langchain import CallbackHandler
return CallbackHandler()
elif self._mode == "openai":
from langfuse.openai import OpenAI
return OpenAI()
else:
return Langfuse(
public_key=self._credentials["public_key"],
secret_key=self._credentials["secret_key"],
host=self._credentials["host"]
)

def save(self, data):
"""Tracing datasets are read-only."""
raise NotImplementedError("LangfuseTraceDataset is read-only - it provides tracing clients, not data storage")