Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions dspy/adapters/types/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,27 @@ def __repr__(self) -> str:
length = len(self.data)
return f"Audio(data=<AUDIO_BASE_64_ENCODED({length})>, audio_format='{self.audio_format}')"

# RLM Sandbox Support

def rlm_preview(self, max_chars: int = 500) -> str:
"""Generate LLM-friendly preview of Audio contents."""
return f"<Audio: format={self.audio_format}, {len(self.data)} base64 chars>"

def to_sandbox(self) -> bytes:
"""Serialize Audio for sandbox injection (descriptor string, not raw data).

Audio data cannot be meaningfully processed as code in the sandbox.
The agent should use llm_query() with multimodal content to perceive audio.
"""
return self.rlm_preview().encode("utf-8")

def sandbox_setup(self) -> str:
return ""

def sandbox_assignment(self, var_name: str, data_expr: str) -> str:
"""Return code that assigns the audio descriptor string in the sandbox."""
return f"{var_name} = {data_expr}"

def encode_audio(audio: Union[str, bytes, dict, "Audio", Any], sampling_rate: int = 16000, format: str = "wav") -> dict:
"""
Encode audio to a dict with 'data' and 'audio_format'.
Expand Down
10 changes: 10 additions & 0 deletions dspy/adapters/types/base_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,16 @@ def parse_lm_response(cls, response: str | dict[str, Any]) -> Optional["Type"]:
"""
return None

# RLM Sandbox Support
#
# To opt-in to RLM sandbox injection, subclasses should implement:
# sandbox_setup() -> str (imports needed in sandbox)
# to_sandbox() -> bytes (serialize for injection)
# sandbox_assignment(var_name, data_expr) -> str (reconstruction code)
# rlm_preview(max_chars) -> str (LLM-friendly preview)
#
# See dspy.DataFrame for a reference implementation.


def split_message_content_for_custom_types(messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Split user message content into a list of content blocks.
Expand Down
25 changes: 25 additions & 0 deletions dspy/adapters/types/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,31 @@ def __repr__(self):
return f"Image(url=data:image/{image_type};base64,<IMAGE_BASE_64_ENCODED({len_base64!s})>)"
return f"Image(url='{self.url}')"

# RLM Sandbox Support

def rlm_preview(self, max_chars: int = 500) -> str:
"""Generate LLM-friendly preview of Image contents."""
if "base64" in self.url:
len_base64 = len(self.url.split("base64,")[1])
image_type = self.url.split(";")[0].split("/")[-1]
return f"<Image: format={image_type}, {len_base64} base64 chars>"
return f"<Image: url={self.url[:max_chars]}>"

def to_sandbox(self) -> bytes:
"""Serialize Image for sandbox injection (descriptor string, not raw data).

Image data cannot be meaningfully processed as code in the sandbox.
The agent should use llm_query() with multimodal content to perceive images.
"""
return self.rlm_preview().encode("utf-8")

def sandbox_setup(self) -> str:
return ""

def sandbox_assignment(self, var_name: str, data_expr: str) -> str:
"""Return code that assigns the image descriptor string in the sandbox."""
return f"{var_name} = {data_expr}"


def is_url(string: str) -> bool:
"""Check if a string is a valid URL."""
Expand Down
567 changes: 541 additions & 26 deletions dspy/predict/rlm.py

Large diffs are not rendered by default.

175 changes: 175 additions & 0 deletions dspy/primitives/local_interpreter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
"""
Unsandboxed local Python interpreter for RLM.

Implements the CodeInterpreter protocol but executes code directly in the host
Python process via exec(). This gives the RLM agent full access to any installed
Python package (PIL, pydub, numpy, scipy, etc.).

Use this when the sandboxed PythonInterpreter (Deno/Pyodide) is too restrictive —
e.g., when the RLM agent needs to manipulate images with PIL or process audio
with pydub directly in its generated code.

Security: This is intentionally UNSANDBOXED. The LLM-generated code runs with
full host process privileges. Only use for local experiments or trusted workloads.

Usage:
from dspy.primitives.local_interpreter import LocalInterpreter

rlm = dspy.RLM("context -> answer", interpreter=LocalInterpreter())
"""

import io
import sys
import traceback
from typing import Any, Callable

from dspy.primitives.code_interpreter import CodeInterpreterError, FinalOutput


class _SubmitCalledError(Exception):
"""Internal signal raised when SUBMIT() is called in user code."""
def __init__(self, output: Any):
self.output = output


class LocalInterpreter:
"""Unsandboxed Python interpreter implementing the CodeInterpreter protocol.

Executes code directly in the host process via exec(). State persists
across execute() calls within a session. Tools are injected as callable
functions in the execution namespace.

This gives the RLM agent full access to the host Python environment:
- PIL/Pillow for image manipulation
- pydub/ffmpeg for audio manipulation
- numpy, scipy, scikit-image, etc.
- Any installed Python package

Note: Not thread-safe. Create separate instances for concurrent use.
"""

def __init__(
self,
tools: dict[str, Callable[..., str]] | None = None,
output_fields: list[dict] | None = None,
):
"""
Args:
tools: Dictionary mapping tool names to callable functions.
Tools are available as top-level functions in the namespace.
output_fields: Output field definitions for typed SUBMIT signature.
"""
self._tools: dict[str, Callable[..., str]] = dict(tools) if tools else {}
self.output_fields = output_fields
self._namespace: dict[str, Any] = {}
self._started = False

@property
def tools(self) -> dict[str, Callable[..., str]]:
"""Tools available for interpreter code to call."""
return self._tools

@tools.setter
def tools(self, value: dict[str, Callable[..., str]]) -> None:
self._tools = value

def start(self) -> None:
"""Initialize the interpreter namespace."""
if self._started:
return
self._namespace = {"__builtins__": __builtins__}
self._started = True

def execute(
self,
code: str,
variables: dict[str, Any] | None = None,
) -> Any:
"""Execute Python code in the host process.

Args:
code: Python code to execute.
variables: Variables to inject into the namespace before execution.
Media objects (Audio, Image) are injected AS-IS, giving
code direct access to their data for manipulation.

Returns:
- FinalOutput: If SUBMIT() was called
- str: Captured stdout (from print() calls)
- None: If no output was produced

Raises:
CodeInterpreterError: On runtime errors
SyntaxError: On invalid Python syntax
"""
if not self._started:
self.start()

# Inject variables directly into namespace (no serialization — objects stay live)
if variables:
self._namespace.update(variables)

# Inject tools as callable functions
for name, func in self._tools.items():
self._namespace[name] = func

# Inject SUBMIT function — maps args to output field names (matching PythonInterpreter)
output_fields = self.output_fields or []
field_names = [f["name"] for f in output_fields]

def SUBMIT(*args, **kwargs): # noqa: N802
if not args and not kwargs:
raise ValueError("SUBMIT requires at least one argument")
if args and kwargs:
raise ValueError("SUBMIT accepts either positional args or keyword args, not both")
if kwargs:
output = kwargs
elif field_names:
if len(args) != len(field_names):
expected = ", ".join(field_names)
raise TypeError(
f"SUBMIT() takes {len(field_names)} positional argument(s) "
f"({expected}), but {len(args)} were given"
)
output = dict(zip(field_names, args, strict=False))
elif len(args) == 1:
output = {"output": args[0]}
else:
output = {"output": args}
raise _SubmitCalledError(output)

self._namespace["SUBMIT"] = SUBMIT

# Capture stdout
old_stdout = sys.stdout
captured = io.StringIO()
sys.stdout = captured

try:
exec(code, self._namespace)
except _SubmitCalledError as e:
return FinalOutput(e.output)
except SyntaxError:
raise
except Exception as e:
tb = traceback.format_exc()
raise CodeInterpreterError(f"{type(e).__name__}: {e}\n{tb}") from e
finally:
sys.stdout = old_stdout

output = captured.getvalue()
if output:
return output.rstrip("\n")
return None

def shutdown(self) -> None:
"""Release resources and clear the namespace."""
self._namespace.clear()
self._started = False

def __enter__(self):
self.start()
return self

def __exit__(self, *args):
self.shutdown()
Loading