diff --git a/src/deepagents/backends/__init__.py b/src/deepagents/backends/__init__.py index 352dd301..c8a4d9dd 100644 --- a/src/deepagents/backends/__init__.py +++ b/src/deepagents/backends/__init__.py @@ -1,18 +1,16 @@ """Memory backends for pluggable file storage.""" -from deepagents.backends.composite import CompositeBackend, CompositeStateBackendProvider +from deepagents.backends.composite import CompositeBackend from deepagents.backends.filesystem import FilesystemBackend -from deepagents.backends.state import StateBackend, StateBackendProvider -from deepagents.backends.store import StoreBackend, StoreBackendProvider -from deepagents.backends.protocol import BackendProtocol +from deepagents.backends.protocol import Backend, BackendProvider +from deepagents.backends.state import StateBackend +from deepagents.backends.store import StoreBackend __all__ = [ - "BackendProtocol", + "Backend", + "BackendProvider", "CompositeBackend", - "CompositeStateBackendProvider", "FilesystemBackend", "StateBackend", - "StateBackendProvider", "StoreBackend", - "StoreBackendProvider", ] diff --git a/src/deepagents/backends/composite.py b/src/deepagents/backends/composite.py index 04eb0822..754774f7 100644 --- a/src/deepagents/backends/composite.py +++ b/src/deepagents/backends/composite.py @@ -1,36 +1,45 @@ """CompositeBackend: Route operations to different backends based on path prefix.""" -from typing import Any, Literal, Optional, TYPE_CHECKING +from deepagents.backends.protocol import Backend, EditResult, WriteResult -from langchain.tools import ToolRuntime -from deepagents.backends.protocol import BackendProtocol, BackendProvider, StateBackendProvider, StateBackendProtocol -from deepagents.backends.state import StateBackend, StateBackendProvider -from langgraph.types import Command +class CompositeBackend(Backend): + """Composite backend that routes operations to different backends based on path prefix. + This backend allows combining multiple backends (checkpoint storage and external storage) + under different path prefixes. For example: + - /memories/* → StoreBackend (persistent across conversations) + - /* → StateBackend (checkpoint storage per conversation) + + Storage Model: Mixed (routes to checkpoint or external backends) + """ -class _CompositeBackend: - def __init__( self, - default: BackendProtocol | StateBackend, - routes: dict[str, BackendProtocol], + default: Backend, + routes: dict[str, Backend], ) -> None: + """Initialize CompositeBackend. + + Args: + default: Default backend for paths that don't match any route + routes: Dictionary mapping path prefixes to backends + """ # Default backend self.default = default - # Virtual routes + # Routed backends self.routes = routes - + # Sort routes by length (longest first) for correct prefix matching self.sorted_routes = sorted(routes.items(), key=lambda x: len(x[0]), reverse=True) - def _get_backend_and_key(self, key: str) -> tuple[BackendProtocol, str]: + def _get_backend_and_key(self, key: str) -> tuple[Backend, str]: """Determine which backend handles this key and strip prefix. - + Args: key: Original file path - + Returns: Tuple of (backend, stripped_key) where stripped_key has the route prefix removed (but keeps leading slash). @@ -40,17 +49,17 @@ def _get_backend_and_key(self, key: str) -> tuple[BackendProtocol, str]: if key.startswith(prefix): # Strip prefix but keep leading slash # e.g., "/memories/notes.txt" → "/notes.txt" - stripped_key = key[len(prefix) - 1:] if key[len(prefix) - 1:] else "/" + stripped_key = key[len(prefix) - 1 :] if key[len(prefix) - 1 :] else "/" return backend, stripped_key - + return self.default, key - + def ls(self, path: str) -> list[str]: """List files from backends, with appropriate prefixes. - + Args: path: Absolute path to directory. - + Returns: List of file paths with route prefixes added. """ @@ -58,21 +67,21 @@ def ls(self, path: str) -> list[str]: for route_prefix, backend in self.sorted_routes: if path.startswith(route_prefix.rstrip("/")): # Query only the matching routed backend - search_path = path[len(route_prefix) - 1:] + search_path = path[len(route_prefix) - 1 :] keys = backend.ls(search_path if search_path else "/") return [f"{route_prefix[:-1]}{key}" for key in keys] - + # Path doesn't match a route: query only default backend return self.default.ls(path) - + def read( - self, + self, file_path: str, offset: int = 0, limit: int = 2000, ) -> str: """Read file content, routing to appropriate backend. - + Args: file_path: Absolute file path offset: Line offset to start reading from (0-indexed) @@ -81,16 +90,16 @@ def read( """ backend, stripped_key = self._get_backend_and_key(file_path) return backend.read(stripped_key, offset=offset, limit=limit) - + def grep( self, pattern: str, - path: Optional[str] = None, - glob: Optional[str] = None, + path: str | None = None, + glob: str | None = None, output_mode: str = "files_with_matches", ) -> str: """Search for a pattern in files, routing to appropriate backend(s). - + Args: pattern: String pattern to search for path: Path to search in (default "/") @@ -100,11 +109,11 @@ def grep( """ for route_prefix, backend in self.sorted_routes: if path is not None and path.startswith(route_prefix.rstrip("/")): - search_path = path[len(route_prefix) - 1:] + search_path = path[len(route_prefix) - 1 :] result = backend.grep(pattern, search_path if search_path else "/", glob, output_mode) if result.startswith("No matches found"): return result - + lines = result.split("\n") prefixed_lines = [] for line in lines: @@ -118,11 +127,11 @@ def grep( return "\n".join(prefixed_lines) all_results = [] - + default_result = self.default.grep(pattern, path, glob, output_mode) if not default_result.startswith("No matches found"): all_results.append(default_result) - + for route_prefix, backend in self.routes.items(): result = backend.grep(pattern, None, glob, output_mode) if not result.startswith("No matches found"): @@ -137,12 +146,12 @@ def grep( else: prefixed_lines.append(line) all_results.append("\n".join(prefixed_lines)) - + if not all_results: return f"No matches found for pattern: '{pattern}'" - + return "\n".join(all_results) - + def glob(self, pattern: str, path: str = "/") -> list[str]: """Find files matching a glob pattern across all backends. @@ -157,7 +166,7 @@ def glob(self, pattern: str, path: str = "/") -> list[str]: for route_prefix, backend in self.sorted_routes: if path.startswith(route_prefix.rstrip("/")): # Path matches a specific route - search only that backend - search_path = path[len(route_prefix) - 1:] + search_path = path[len(route_prefix) - 1 :] matches = backend.glob(pattern, search_path if search_path else "/") results.extend(f"{route_prefix[:-1]}{match}" for match in matches) return sorted(results) @@ -173,99 +182,40 @@ def glob(self, pattern: str, path: str = "/") -> list[str]: return sorted(results) - -class CompositeStateBacked(_CompositeBackend): - - def __init__( - self, - default: StateBackend, - routes: dict[str, BackendProtocol], - ) -> None: - self.default = default - self.routes = routes - - # Sort routes by length (longest first) for correct prefix matching - self.sorted_routes = sorted(routes.items(), key=lambda x: len(x[0]), reverse=True) - def write( - self, - file_path: str, - content: str, - ) -> Command | str: + self, + file_path: str, + content: str, + ) -> WriteResult: """Create a new file, routing to appropriate backend. Args: file_path: Absolute file path - content: File content as a stringReturns: - Success message or Command object, or error if file already exists. - """ - backend, stripped_key = self._get_backend_and_key(file_path) - return backend.write(stripped_key, content) - - def edit( - self, - file_path: str, - old_string: str, - new_string: str, - replace_all: bool = False, - ) -> Command | str: - """Edit a file, routing to appropriate backend. - - Args: - file_path: Absolute file path - old_string: String to find and replace - new_string: Replacement string - replace_all: If True, replace all occurrencesReturns: - Success message or Command object, or error message on failure. - """ - backend, stripped_key = self._get_backend_and_key(file_path) - return backend.edit(stripped_key, old_string, new_string, replace_all=replace_all) - + content: File content as a string -class CompositeBackend(_CompositeBackend): - def write( - self, - file_path: str, - content: str, - ) -> str: - """Create a new file, routing to appropriate backend. - - Args: - file_path: Absolute file path - content: File content as a stringReturns: - Success message or Command object, or error if file already exists. + Returns: + WriteResult from the appropriate backend. """ backend, stripped_key = self._get_backend_and_key(file_path) return backend.write(stripped_key, content) def edit( - self, - file_path: str, - old_string: str, - new_string: str, - replace_all: bool = False, - ) -> str: + self, + file_path: str, + old_string: str, + new_string: str, + replace_all: bool = False, + ) -> EditResult: """Edit a file, routing to appropriate backend. Args: file_path: Absolute file path old_string: String to find and replace new_string: Replacement string - replace_all: If True, replace all occurrencesReturns: - Success message or Command object, or error message on failure. + replace_all: If True, replace all occurrences + + Returns: + EditResult from the appropriate backend. """ backend, stripped_key = self._get_backend_and_key(file_path) return backend.edit(stripped_key, old_string, new_string, replace_all=replace_all) - -class CompositeStateBackendProvider(StateBackendProvider): - - def __init__(self, routes: dict[str, BackendProtocol | BackendProvider]): - self.routes = routes - - def get_backend(self, runtime: ToolRuntime) -> StateBackendProtocol: - return CompositeStateBacked( - default=StateBackendProvider.get_backend(runtime), - routes={ - k: v if isinstance(v, BackendProtocol) else v.get_backend(runtime) for k,v in self.routes.items() - } - ) \ No newline at end of file diff --git a/src/deepagents/backends/filesystem.py b/src/deepagents/backends/filesystem.py index e36423fc..28e4e994 100644 --- a/src/deepagents/backends/filesystem.py +++ b/src/deepagents/backends/filesystem.py @@ -7,45 +7,45 @@ and optional glob include filtering, while preserving virtual path behavior """ +import json import os import re -import json import subprocess -from datetime import datetime from pathlib import Path -from typing import Any, Optional, TYPE_CHECKING -from langgraph.types import Command -if TYPE_CHECKING: - from langchain.tools import ToolRuntime +import wcmatch.glob as wcglob -from .utils import ( +from deepagents.backends.protocol import Backend, EditResult, WriteResult +from deepagents.backends.utils import ( + _format_grep_results, check_empty_content, format_content_with_line_numbers, perform_string_replacement, - _format_grep_results, - truncate_if_too_long + truncate_if_too_long, ) -import wcmatch.glob as wcglob - -class FilesystemBackend: +class FilesystemBackend(Backend): """Backend that reads and writes files directly from the filesystem. Files are accessed using their actual filesystem paths. Relative paths are resolved relative to the current working directory. Content is read/written as plain text, and metadata (timestamps) are derived from filesystem stats. + + Storage Model: External Storage + -------------------------------- + This backend uses external storage (filesystem). Write and edit operations + persist directly to disk and return WriteResult/EditResult with files_update=None. """ def __init__( - self, - root_dir: Optional[str | Path] = None, + self, + root_dir: str | Path | None = None, virtual_mode: bool = False, max_file_size_mb: int = 10, ) -> None: """Initialize filesystem backend. - + Args: root_dir: Optional root directory for file operations. If provided, all file paths will be resolved relative to this directory. @@ -90,7 +90,7 @@ def ls(self, path: str) -> list[str]: Args: path: Absolute directory path to list files from. - + Returns: List of absolute file paths. """ @@ -115,10 +115,10 @@ def ls(self, path: str) -> list[str]: continue # Strip the cwd prefix if present if abs_path.startswith(cwd_str): - relative_path = abs_path[len(cwd_str):] + relative_path = abs_path[len(cwd_str) :] elif abs_path.startswith(str(self.cwd)): # Handle case where cwd doesn't end with / - relative_path = abs_path[len(str(self.cwd)):].lstrip("/") + relative_path = abs_path[len(str(self.cwd)) :].lstrip("/") else: # Path is outside cwd, return as-is or skip relative_path = abs_path @@ -128,15 +128,15 @@ def ls(self, path: str) -> list[str]: pass return truncate_if_too_long(sorted(results)) - + def read( - self, + self, file_path: str, offset: int = 0, limit: int = 2000, ) -> str: """Read file content with line numbers. - + Args: file_path: Absolute or relative file path offset: Line offset to start reading from (0-indexed) @@ -144,10 +144,10 @@ def read( Formatted file content with line numbers, or error message. """ resolved_path = self._resolve_path(file_path) - + if not resolved_path.exists() or not resolved_path.is_file(): return f"Error: File '{file_path}' not found" - + try: # Open with O_NOFOLLOW where available to avoid symlink traversal try: @@ -156,42 +156,44 @@ def read( content = f.read() except OSError: # Fallback to normal open if O_NOFOLLOW unsupported or fails - with open(resolved_path, "r", encoding="utf-8") as f: + with open(resolved_path, encoding="utf-8") as f: content = f.read() - + empty_msg = check_empty_content(content) if empty_msg: return empty_msg - + lines = content.splitlines() start_idx = offset end_idx = min(start_idx + limit, len(lines)) - + if start_idx >= len(lines): return f"Error: Line offset {offset} exceeds file length ({len(lines)} lines)" - + selected_lines = lines[start_idx:end_idx] return format_content_with_line_numbers(selected_lines, start_line=start_idx + 1) except (OSError, UnicodeDecodeError) as e: return f"Error reading file '{file_path}': {e}" - + def write( - self, + self, file_path: str, content: str, - ) -> Command | str: + ) -> WriteResult: """Create a new file with content. - + Args: file_path: Absolute or relative file path - content: File content as a stringReturns: - Success message or error if file already exists. + content: File content as a string + + Returns: + WriteResult with files_update=None (external storage). """ resolved_path = self._resolve_path(file_path) - + if resolved_path.exists(): - return f"Cannot write to {file_path} because it already exists. Read and then make an edit, or write to a new path." - + return WriteResult(error=f"Cannot write to {file_path} because it already exists. Read and then make an edit, or write to a new path.") + try: # Create parent directories if needed resolved_path.parent.mkdir(parents=True, exist_ok=True) @@ -203,32 +205,37 @@ def write( fd = os.open(resolved_path, flags, 0o644) with os.fdopen(fd, "w", encoding="utf-8") as f: f.write(content) - - return f"Updated file {file_path}" + + return WriteResult( + path=file_path, + files_update=None, # External storage: already persisted to disk + ) except (OSError, UnicodeEncodeError) as e: - return f"Error writing file '{file_path}': {e}" - + return WriteResult(error=f"Error writing file '{file_path}': {e}") + def edit( - self, + self, file_path: str, old_string: str, new_string: str, replace_all: bool = False, - ) -> Command | str: + ) -> EditResult: """Edit a file by replacing string occurrences. - + Args: file_path: Absolute or relative file path old_string: String to find and replace new_string: Replacement string - replace_all: If True, replace all occurrencesReturns: - Success message or error message on failure. + replace_all: If True, replace all occurrences + + Returns: + EditResult with files_update=None (external storage). """ resolved_path = self._resolve_path(file_path) - + if not resolved_path.exists() or not resolved_path.is_file(): - return f"Error: File '{file_path}' not found" - + return EditResult(error=f"Error: File '{file_path}' not found") + try: # Read securely try: @@ -236,16 +243,17 @@ def edit( with os.fdopen(fd, "r", encoding="utf-8") as f: content = f.read() except OSError: - with open(resolved_path, "r", encoding="utf-8") as f: + with open(resolved_path, encoding="utf-8") as f: content = f.read() - + result = perform_string_replacement(content, old_string, new_string, replace_all) - + if isinstance(result, str): - return result - + # Error message from perform_string_replacement + return EditResult(error=result) + new_content, occurrences = result - + # Write securely flags = os.O_WRONLY | os.O_TRUNC if hasattr(os, "O_NOFOLLOW"): @@ -253,16 +261,20 @@ def edit( fd = os.open(resolved_path, flags) with os.fdopen(fd, "w", encoding="utf-8") as f: f.write(new_content) - - return f"Successfully replaced {occurrences} instance(s) of the string in '{file_path}'" + + return EditResult( + path=file_path, + files_update=None, # External storage: already persisted to disk + occurrences=occurrences, + ) except (OSError, UnicodeDecodeError, UnicodeEncodeError) as e: - return f"Error editing file '{file_path}': {e}" - + return EditResult(error=f"Error editing file '{file_path}': {e}") + def grep( self, pattern: str, - path: Optional[str] = None, - glob: Optional[str] = None, + path: str | None = None, + glob: str | None = None, output_mode: str = "files_with_matches", ) -> str: """Search for a pattern in files (ripgrep with Python fallback). @@ -301,9 +313,7 @@ def grep( return truncate_if_too_long(_format_grep_results(results, output_mode)) - def _ripgrep_search( - self, pattern: str, base_full: Path, include_glob: Optional[str] - ) -> Optional[dict[str, list[tuple[int, str]]]]: + def _ripgrep_search(self, pattern: str, base_full: Path, include_glob: str | None) -> dict[str, list[tuple[int, str]]] | None: cmd = ["rg", "--json"] if include_glob: cmd.extend(["--glob", include_glob]) @@ -348,9 +358,7 @@ def _ripgrep_search( return results - def _python_search( - self, pattern: str, base_full: Path, include_glob: Optional[str] - ) -> dict[str, list[tuple[int, str]]]: + def _python_search(self, pattern: str, base_full: Path, include_glob: str | None) -> dict[str, list[tuple[int, str]]]: try: regex = re.compile(pattern) except re.error: @@ -385,10 +393,10 @@ def _python_search( results.setdefault(virt_path, []).append((line_num, line)) return results - + def glob(self, pattern: str, path: str = "/") -> list[str]: """Find files matching a glob pattern. - + Args: pattern: Glob pattern (e.g., "**/*.py", "*.txt", "/subdir/**/*.md") path: Base path to search from (default "/")Returns: @@ -396,17 +404,17 @@ def glob(self, pattern: str, path: str = "/") -> list[str]: """ if pattern.startswith("/"): pattern = pattern.lstrip("/") - + if path == "/": search_path = self.cwd else: search_path = self._resolve_path(path) - + if not search_path.exists() or not search_path.is_dir(): return [] - + results = [] - + try: for matched_path in search_path.glob(pattern): if matched_path.is_file(): @@ -414,18 +422,18 @@ def glob(self, pattern: str, path: str = "/") -> list[str]: if not self.virtual_mode: results.append(abs_path) continue - + cwd_str = str(self.cwd) if not cwd_str.endswith("/"): cwd_str += "/" - + if abs_path.startswith(cwd_str): - relative_path = abs_path[len(cwd_str):] + relative_path = abs_path[len(cwd_str) :] elif abs_path.startswith(str(self.cwd)): - relative_path = abs_path[len(str(self.cwd)):].lstrip("/") + relative_path = abs_path[len(str(self.cwd)) :].lstrip("/") else: relative_path = abs_path - + results.append("/" + relative_path) except (OSError, ValueError): pass diff --git a/src/deepagents/backends/protocol.py b/src/deepagents/backends/protocol.py index 5a88020d..b1179873 100644 --- a/src/deepagents/backends/protocol.py +++ b/src/deepagents/backends/protocol.py @@ -1,218 +1,200 @@ -"""Protocol definition for pluggable memory backends. +"""Type definitions for backend interface. -This module defines the BackendProtocol that all backend implementations +This module defines the unified Backend interface that all backend implementations must follow. Backends can store files in different locations (state, filesystem, database, etc.) and provide a uniform interface for file operations. """ -from typing import TYPE_CHECKING, Optional, Protocol, runtime_checkable -from langgraph.types import Command +from abc import ABC, abstractmethod +from collections.abc import Awaitable, Callable +from dataclasses import dataclass +from typing import Any + from langchain.tools import ToolRuntime -@runtime_checkable -class _BackendProtocol(Protocol): - """Protocol for pluggable memory backends. +@dataclass +class WriteResult: + """Result from backend write operations. + + Attributes: + error: Error message on failure, None on success. + path: Absolute path of written file, None on failure. + files_update: State update dict for checkpoint backends, None for external storage. + Checkpoint backends populate this with {file_path: file_data} for LangGraph state. + External backends set None (already persisted to disk/S3/database/etc). + + Examples: + >>> # Checkpoint storage + >>> WriteResult(path="/f.txt", files_update={"/f.txt": {...}}) + >>> # External storage + >>> WriteResult(path="/f.txt", files_update=None) + >>> # Error + >>> WriteResult(error="File exists") + """ + + error: str | None = None + path: str | None = None + files_update: dict[str, Any] | None = None + + +@dataclass +class EditResult: + """Result from backend edit operations. + + Attributes: + error: Error message on failure, None on success. + path: Absolute path of edited file, None on failure. + files_update: State update dict for checkpoint backends, None for external storage. + Checkpoint backends populate this with {file_path: file_data} for LangGraph state. + External backends set None (already persisted to disk/S3/database/etc). + occurrences: Number of replacements made, None on failure. + + Examples: + >>> # Checkpoint storage + >>> EditResult(path="/f.txt", files_update={"/f.txt": {...}}, occurrences=1) + >>> # External storage + >>> EditResult(path="/f.txt", files_update=None, occurrences=2) + >>> # Error + >>> EditResult(error="File not found") + """ + + error: str | None = None + path: str | None = None + files_update: dict[str, Any] | None = None + occurrences: int | None = None + + +class Backend(ABC): + """Abstract interface for pluggable file storage backends. - Backends can store files in different locations (state, filesystem, database, etc.) + Backends store files in different locations (LangGraph state, filesystem, database, etc.) and provide a uniform interface for file operations. - All file data is represented as dicts with the following structure: - { - "content": list[str], # Lines of text content - "created_at": str, # ISO format timestamp - "modified_at": str, # ISO format timestamp - } + Storage Models: + Checkpoint Storage (StateBackend): Files stored in LangGraph state, persisted via + checkpointing. Returns WriteResult/EditResult with files_update populated for + tool layer to convert to Command objects. + + External Storage (FilesystemBackend, StoreBackend, etc.): Files persisted directly + to external systems (disk, S3, BaseStore, database). Returns WriteResult/EditResult + with files_update=None. + + File Data Format: + Checkpoint backends use dicts: {"content": list[str], "created_at": str, "modified_at": str}. + External backends manage their own format internally. """ - + + @abstractmethod def ls(self, path: str) -> list[str]: """List all file paths in a directory. - + Args: - path: Absolute path to directory (e.g., "/", "/subdir/", "/memories/") - + path: Absolute directory path (e.g., "/", "/subdir/", "/memories/"). + Returns: - List of absolute file paths in the specified directory. + List of absolute file paths. Empty list if directory doesn't exist or is empty. """ ... - + + @abstractmethod def read( - self, + self, file_path: str, offset: int = 0, limit: int = 2000, ) -> str: """Read file content with line numbers. - - Args: - file_path: Absolute file path (e.g., "/notes.txt", "/memories/agent.md") - offset: Line offset to start reading from (0-indexed) - limit: Maximum number of lines to read - - Returns: - Formatted file content with line numbers (cat -n style), or error message. - Returns "Error: File '{file_path}' not found" if file doesn't exist. - Returns "System reminder: File exists but has empty contents" for empty files. - """ - ... - - - def grep( - self, - pattern: str, - path: Optional[str] = None, - glob: Optional[str] = None, - output_mode: str = "files_with_matches", - ) -> str: - """Search for a pattern in files. - - TODO: This implementation is significantly less capable than Claude Code's Grep tool. - Missing features to add in the future: - - Context lines: -A (after), -B (before), -C (context) parameters - - Line numbers: -n parameter to show line numbers in output - - Case sensitivity: -i parameter for case-insensitive search - - Output limiting: head_limit parameter for large result sets - - File type filter: type parameter (e.g., "py", "js") - - Multiline support: multiline parameter for cross-line pattern matching - - Pattern semantics: Clarify if pattern is regex or literal string - See /memories/memory_backend_vs_claude_code_comparison.md for full details. - - Args: - pattern: String pattern to search for (currently literal string) - path: Path to search in (default "/") - glob: Optional glob pattern to filter files (e.g., "*.py") - output_mode: Output format - "files_with_matches", "content", or "count" - - files_with_matches: List file paths that contain matches - - content: Show matching lines with file paths and line numbers - - count: Show count of matches per file - - Returns: - Formatted search results based on output_mode, or message if no matches found. - """ - ... - - def glob(self, pattern: str, path: str = "/") -> list[str]: - """Find files matching a glob pattern. Args: - pattern: Glob pattern (e.g., "**/*.py", "*.txt", "/subdir/**/*.md") - path: Base path to search from (default: "/") + file_path: Absolute file path (e.g., "/notes.txt", "/memories/agent.md"). + offset: Line offset to start reading from (0-indexed). + limit: Maximum number of lines to read. Returns: - List of absolute file paths matching the pattern. + Formatted content with line numbers (cat -n style), or error message if file not found. """ ... - -class BackendProtocol(_BackendProtocol): - def write( - self, - file_path: str, - content: str, - ) -> str: + @abstractmethod + def write(self, file_path: str, content: str) -> WriteResult: """Create a new file with content. Args: - file_path: Absolute file path (e.g., "/notes.txt", "/memories/agent.md") - content: File content as a string + file_path: Absolute file path (e.g., "/notes.txt", "/memories/agent.md"). + content: File content as a string. Returns: - - Command object for StateBackend (uses_state=True) to update LangGraph state - - Success message string for other backends, or error if file already exists - - Error cases: - - Returns error message if file already exists (should use edit instead) + WriteResult with error=None on success (files_update populated for checkpoint + backends, None for external). Returns error message if file exists, permission + denied, path traversal attempt, or I/O error. """ ... + @abstractmethod def edit( - self, - file_path: str, - old_string: str, - new_string: str, - replace_all: bool = False, - ) -> str: + self, + file_path: str, + old_string: str, + new_string: str, + replace_all: bool = False, + ) -> EditResult: """Edit a file by replacing string occurrences. Args: - file_path: Absolute file path (e.g., "/notes.txt", "/memories/agent.md") - old_string: String to find and replace - new_string: Replacement string - replace_all: If True, replace all occurrences; if False, require unique match + file_path: Absolute file path (e.g., "/notes.txt", "/memories/agent.md"). + old_string: String to find and replace. + new_string: Replacement string. + replace_all: If True, replace all occurrences; if False, require unique match. Returns: - - Command object for StateBackend (uses_state=True) to update LangGraph state - - Success message string for other backends, or error message on failure - - Error cases: - - "Error: File '{file_path}' not found" if file doesn't exist - - "Error: String not found in file: '{old_string}'" if string not found - - "Error: String '{old_string}' appears {n} times. Use replace_all=True..." - if multiple matches found and replace_all=False + EditResult with error=None on success (files_update populated for checkpoint + backends, None for external). Returns error if file not found, string not found, + or multiple matches without replace_all=True. """ ... - -@runtime_checkable -class BackendProvider(Protocol): - - def get_backend(self, runtime: ToolRuntime) -> BackendProtocol: - """Get the backend.""" - ... - - -class StateBackendProtocol(_BackendProtocol): - - def write( - self, - file_path: str, - content: str, - ) -> Command | str: - """Create a new file with content. + @abstractmethod + def grep( + self, + pattern: str, + path: str | None = None, + glob: str | None = None, + output_mode: str = "files_with_matches", + ) -> str: + """Search for a pattern in files. Args: - file_path: Absolute file path (e.g., "/notes.txt", "/memories/agent.md") - content: File content as a string + pattern: Search pattern (implementation-specific: literal or regex). + path: Directory to search in (default "/"). + glob: Optional glob pattern to filter files (e.g., "*.py"). + output_mode: Output format - "files_with_matches" (file paths), "content" + (matching lines with context), or "count" (match counts per file). Returns: - - Command object for StateBackend (uses_state=True) to update LangGraph state - - Success message string for other backends, or error if file already exists + Formatted search results based on output_mode, or message if no matches found. - Error cases: - - Returns error message if file already exists (should use edit instead) + Note: + This is a basic implementation. Missing features: context lines (-A/-B/-C), + line numbers (-n), case sensitivity (-i), output limiting, file type filters, + multiline support. """ ... - def edit( - self, - file_path: str, - old_string: str, - new_string: str, - replace_all: bool = False, - ) -> Command | str: - """Edit a file by replacing string occurrences. + @abstractmethod + def glob(self, pattern: str, path: str = "/") -> list[str]: + """Find files matching a glob pattern. Args: - file_path: Absolute file path (e.g., "/notes.txt", "/memories/agent.md") - old_string: String to find and replace - new_string: Replacement string - replace_all: If True, replace all occurrences; if False, require unique match + pattern: Glob pattern (e.g., "**/*.py", "*.txt", "/subdir/**/*.md"). + path: Base directory to search from (default "/"). Returns: - - Command object for StateBackend (uses_state=True) to update LangGraph state - - Success message string for other backends, or error message on failure - - Error cases: - - "Error: File '{file_path}' not found" if file doesn't exist - - "Error: String not found in file: '{old_string}'" if string not found - - "Error: String '{old_string}' appears {n} times. Use replace_all=True..." - if multiple matches found and replace_all=False + List of absolute file paths matching pattern. Empty list if no matches. """ ... -@runtime_checkable -class StateBackendProvider(Protocol): - def get_backend(self, runtime: ToolRuntime) -> StateBackendProtocol: - """Get the backend.""" - ... \ No newline at end of file +# Backend factory function types +BackendProvider = Callable[[ToolRuntime], Backend] +AsyncBackendProvider = Callable[[ToolRuntime], Awaitable[Backend]] diff --git a/src/deepagents/backends/state.py b/src/deepagents/backends/state.py index 37cd7bce..7c74f342 100644 --- a/src/deepagents/backends/state.py +++ b/src/deepagents/backends/state.py @@ -1,49 +1,49 @@ """StateBackend: Store files in LangGraph agent state (ephemeral).""" -import re -from typing import Any, Literal, Optional, TYPE_CHECKING +from typing import Literal -from langchain.tools import ToolRuntime - -from langchain_core.messages import ToolMessage -from langgraph.types import Command - -from .utils import ( +from deepagents.backends.protocol import Backend, EditResult, WriteResult +from deepagents.backends.utils import ( + _glob_search_files, + _grep_search_files, create_file_data, - update_file_data, file_data_to_string, format_read_response, perform_string_replacement, truncate_if_too_long, - _glob_search_files, - _grep_search_files, + update_file_data, ) -class StateBackend: +class StateBackend(Backend): """Backend that stores files in agent state (ephemeral). - + Uses LangGraph's state management and checkpointing. Files persist within a conversation thread but not across threads. State is automatically checkpointed after each agent step. - - Special handling: Since LangGraph state must be updated via Command objects - (not direct mutation), operations return Command objects instead of None. - This is indicated by the uses_state=True flag. + + Storage Model: Checkpoint Storage + --------------------------------- + This backend stores files in LangGraph state, persisted via LangGraph's checkpoint + system (Postgres, Redis, in-memory, etc.). Write and edit operations return + WriteResult/EditResult with files_update populated, which the tool layer converts + into Command objects for state updates. """ - + def __init__(self, runtime: "ToolRuntime"): """Initialize StateBackend with runtime. - - Args:""" + + Args: + runtime: Tool runtime with access to LangGraph state + """ self.runtime = runtime - + def ls(self, path: str) -> list[str]: """List files from state. - + Args: path: Absolute path to directory. - + Returns: List of file paths. """ @@ -51,15 +51,15 @@ def ls(self, path: str) -> list[str]: keys = list(files.keys()) keys = [k for k in keys if k.startswith(path)] return truncate_if_too_long(keys) - + def read( - self, + self, file_path: str, offset: int = 0, limit: int = 2000, ) -> str: """Read file content with line numbers. - + Args: file_path: Absolute file path offset: Line offset to start reading from (0-indexed) @@ -68,97 +68,80 @@ def read( """ files = self.runtime.state.get("files", {}) file_data = files.get(file_path) - + if file_data is None: return f"Error: File '{file_path}' not found" - + return format_read_response(file_data, offset, limit) - + def write( - self, + self, file_path: str, content: str, - ) -> Command | str: + ) -> WriteResult: """Create a new file with content. - + Args: file_path: Absolute file path - content: File content as a stringReturns: - Command object to update state, or error message if file exists. + content: File content as a string + + Returns: + WriteResult with files_update populated for framework state update. """ files = self.runtime.state.get("files", {}) - + if file_path in files: - return f"Cannot write to {file_path} because it already exists. Read and then make an edit, or write to a new path." - + return WriteResult(error=f"Cannot write to {file_path} because it already exists. Read and then make an edit, or write to a new path.") + new_file_data = create_file_data(content) - tool_call_id = self.runtime.tool_call_id - - return Command( - update={ - "files": {file_path: new_file_data}, - "messages": [ - ToolMessage( - content=f"Updated file {file_path}", - tool_call_id=tool_call_id, - ) - ], - } - ) - + + return WriteResult(path=file_path, files_update={file_path: new_file_data}) + def edit( - self, + self, file_path: str, old_string: str, new_string: str, replace_all: bool = False, - ) -> Command | str: + ) -> EditResult: """Edit a file by replacing string occurrences. - + Args: file_path: Absolute file path old_string: String to find and replace new_string: Replacement string - replace_all: If True, replace all occurrencesReturns: - Command object to update state, or error message on failure. + replace_all: If True, replace all occurrences + + Returns: + EditResult with files_update populated for framework state update. """ files = self.runtime.state.get("files", {}) file_data = files.get(file_path) - + if file_data is None: - return f"Error: File '{file_path}' not found" - + return EditResult(error=f"Error: File '{file_path}' not found") + content = file_data_to_string(file_data) result = perform_string_replacement(content, old_string, new_string, replace_all) - + if isinstance(result, str): - return result - + # Error message from perform_string_replacement + return EditResult(error=result) + new_content, occurrences = result new_file_data = update_file_data(file_data, new_content) - tool_call_id = self.runtime.tool_call_id - - return Command( - update={ - "files": {file_path: new_file_data}, - "messages": [ - ToolMessage( - content=f"Successfully replaced {occurrences} instance(s) of the string in '{file_path}'", - tool_call_id=tool_call_id, - ) - ], - } - ) - + + return EditResult(path=file_path, files_update={file_path: new_file_data}, occurrences=occurrences) + def grep( self, pattern: str, path: str = "/", - glob: Optional[str] = None, + glob: str | None = None, output_mode: Literal["files_with_matches", "content", "count"] = "files_with_matches", ) -> str: """Search for a pattern in files. - + Args: pattern: String pattern to search for path: Path to search in (default "/") @@ -167,25 +150,20 @@ def grep( Formatted search results based on output_mode. """ files = self.runtime.state.get("files", {}) - + return truncate_if_too_long(_grep_search_files(files, pattern, path, glob, output_mode)) - + def glob(self, pattern: str, path: str = "/") -> list[str]: """Find files matching a glob pattern. - + Args: pattern: Glob pattern (e.g., "**/*.py", "*.txt", "/subdir/**/*.md") path: Base path to search from (default "/")Returns: List of absolute file paths matching the pattern. """ files = self.runtime.state.get("files", {}) - + result = _glob_search_files(files, pattern, path) if result == "No files found": return [] return truncate_if_too_long(result.split("\n")) - -class StateBackendProvider: - - def get_backend(self, runtime: ToolRuntime): - return StateBackend(runtime) \ No newline at end of file diff --git a/src/deepagents/backends/store.py b/src/deepagents/backends/store.py index f0616830..c8f320e4 100644 --- a/src/deepagents/backends/store.py +++ b/src/deepagents/backends/store.py @@ -1,48 +1,55 @@ """StoreBackend: Adapter for LangGraph's BaseStore (persistent, cross-thread).""" -import re -from typing import Any, Optional, TYPE_CHECKING +from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from langchain.tools import ToolRuntime from langgraph.config import get_config from langgraph.store.base import BaseStore, Item -from langgraph.types import Command +from deepagents.backends.protocol import Backend, EditResult, WriteResult from deepagents.backends.utils import ( + _glob_search_files, + _grep_search_files, create_file_data, - update_file_data, file_data_to_string, format_read_response, perform_string_replacement, truncate_if_too_long, - _glob_search_files, - _grep_search_files, + update_file_data, ) -class StoreBackend: +class StoreBackend(Backend): """Backend that stores files in LangGraph's BaseStore (persistent). - + Uses LangGraph's Store for persistent, cross-conversation storage. Files are organized via namespaces and persist across all threads. - + The namespace can include an optional assistant_id for multi-agent isolation. + + Storage Model: External Storage + -------------------------------- + This backend uses external storage (LangGraph BaseStore). Write and edit + operations persist directly to the store and return WriteResult/EditResult + with files_update=None. """ + def __init__(self, runtime: "ToolRuntime"): """Initialize StoreBackend with runtime. - - Args:""" - self.runtime = runtime + Args: + runtime: Tool runtime with access to LangGraph BaseStore + """ + self.runtime = runtime def _get_store(self) -> BaseStore: """Get the store instance. - + Args:Returns: BaseStore instance - + Raises: ValueError: If no store is available or runtime not provided """ @@ -51,14 +58,14 @@ def _get_store(self) -> BaseStore: msg = "Store is required but not available in runtime" raise ValueError(msg) return store - + def _get_namespace(self) -> tuple[str, ...]: """Get the namespace for store operations. - + Returns a tuple for organizing files in the store. If an assistant_id is available in the config metadata, returns (assistant_id, "filesystem") to provide per-assistant isolation. Otherwise, returns ("filesystem",). - + Returns: Namespace tuple for store operations. """ @@ -70,16 +77,16 @@ def _get_namespace(self) -> tuple[str, ...]: if assistant_id is None: return (namespace,) return (assistant_id, namespace) - + def _convert_store_item_to_file_data(self, store_item: Item) -> dict[str, Any]: """Convert a store Item to FileData format. - + Args: store_item: The store Item containing file data. - + Returns: FileData dict with content, created_at, and modified_at fields. - + Raises: ValueError: If required fields are missing or have incorrect types. """ @@ -97,13 +104,13 @@ def _convert_store_item_to_file_data(self, store_item: Item) -> dict[str, Any]: "created_at": store_item.value["created_at"], "modified_at": store_item.value["modified_at"], } - + def _convert_file_data_to_store_value(self, file_data: dict[str, Any]) -> dict[str, Any]: """Convert FileData to a dict suitable for store.put(). - + Args: file_data: The FileData to convert. - + Returns: Dictionary with content, created_at, and modified_at fields. """ @@ -159,133 +166,145 @@ def _search_store_paginated( offset += page_size return all_items - + def ls(self, path: str) -> list[str]: """List files from store. - + Args: path: Absolute path to directory. - + Returns: List of file paths. """ store = self._get_store() namespace = self._get_namespace() - + # Search store with path filter items = self._search_store_paginated(store, namespace, filter={"prefix": path}) - + return truncate_if_too_long([item.key for item in items]) - + def read( - self, + self, file_path: str, offset: int = 0, limit: int = 2000, ) -> str: """Read file content with line numbers. - + Args: file_path: Absolute file path offset: Line offset to start reading from (0-indexed)limit: Maximum number of lines to read - + Returns: Formatted file content with line numbers, or error message. """ store = self._get_store() namespace = self._get_namespace() - item: Optional[Item] = store.get(namespace, file_path) - + item: Item | None = store.get(namespace, file_path) + if item is None: return f"Error: File '{file_path}' not found" - + try: file_data = self._convert_store_item_to_file_data(item) except ValueError as e: return f"Error: {e}" - + return format_read_response(file_data, offset, limit) - + def write( - self, + self, file_path: str, content: str, - ) -> Command | str: + ) -> WriteResult: """Create a new file with content. - + Args: file_path: Absolute file path - content: File content as a stringReturns: - Success message or error if file already exists. + content: File content as a string + + Returns: + WriteResult with files_update=None (external storage). """ store = self._get_store() namespace = self._get_namespace() - + # Check if file exists existing = store.get(namespace, file_path) if existing is not None: - return f"Cannot write to {file_path} because it already exists. Read and then make an edit, or write to a new path." - - # Create new file + return WriteResult(error=f"Cannot write to {file_path} because it already exists. Read and then make an edit, or write to a new path.") + + # Create new file and persist to store file_data = create_file_data(content) store_value = self._convert_file_data_to_store_value(file_data) store.put(namespace, file_path, store_value) - - return f"Updated file {file_path}" - + + return WriteResult( + path=file_path, + files_update=None, # External storage: already persisted to store + ) + def edit( - self, + self, file_path: str, old_string: str, new_string: str, replace_all: bool = False, - ) -> Command | str: + ) -> EditResult: """Edit a file by replacing string occurrences. - + Args: file_path: Absolute file path old_string: String to find and replace new_string: Replacement string - replace_all: If True, replace all occurrencesReturns: - Success message or error message on failure. + replace_all: If True, replace all occurrences + + Returns: + EditResult with files_update=None (external storage). """ store = self._get_store() namespace = self._get_namespace() - + # Get existing file item = store.get(namespace, file_path) if item is None: - return f"Error: File '{file_path}' not found" - + return EditResult(error=f"Error: File '{file_path}' not found") + try: file_data = self._convert_store_item_to_file_data(item) except ValueError as e: - return f"Error: {e}" - + return EditResult(error=f"Error: {e}") + content = file_data_to_string(file_data) result = perform_string_replacement(content, old_string, new_string, replace_all) - + if isinstance(result, str): - return result - + # Error message from perform_string_replacement + return EditResult(error=result) + new_content, occurrences = result new_file_data = update_file_data(file_data, new_content) - + # Update file in store store_value = self._convert_file_data_to_store_value(new_file_data) store.put(namespace, file_path, store_value) - - return f"Successfully replaced {occurrences} instance(s) of the string in '{file_path}'" - + + return EditResult( + path=file_path, + files_update=None, # External storage: already persisted to store + occurrences=occurrences, + ) + def grep( self, pattern: str, path: str = "/", - glob: Optional[str] = None, + glob: str | None = None, output_mode: str = "files_with_matches", ) -> str: """Search for a pattern in files. - + Args: pattern: String pattern to search for path: Path to search in (default "/") @@ -295,9 +314,9 @@ def grep( """ store = self._get_store() namespace = self._get_namespace() - + items = self._search_store_paginated(store, namespace) - + files = {} for item in items: if item is None: @@ -307,12 +326,12 @@ def grep( files[item.key] = file_data except ValueError: continue - + return truncate_if_too_long(_grep_search_files(files, pattern, path, glob, output_mode)) - + def glob(self, pattern: str, path: str = "/") -> list[str]: """Find files matching a glob pattern. - + Args: pattern: Glob pattern (e.g., "**/*.py", "*.txt", "/subdir/**/*.md") path: Base path to search from (default "/")Returns: @@ -320,9 +339,9 @@ def glob(self, pattern: str, path: str = "/") -> list[str]: """ store = self._get_store() namespace = self._get_namespace() - + items = self._search_store_paginated(store, namespace) - + files = {} for item in items: if item is None: @@ -337,18 +356,3 @@ def glob(self, pattern: str, path: str = "/") -> list[str]: if result == "No files found": return [] return truncate_if_too_long(result.split("\n")) - - -class StoreBackendProvider: - """Provider for StoreBackend that creates instances with runtime.""" - - def get_backend(self, runtime: "ToolRuntime") -> StoreBackend: - """Create a StoreBackend instance with the given runtime. - - Args: - runtime: The ToolRuntime instance to pass to StoreBackend. - - Returns: - Configured StoreBackend instance. - """ - return StoreBackend(runtime) diff --git a/src/deepagents/backends/utils.py b/src/deepagents/backends/utils.py index b7e527bd..71bbb293 100644 --- a/src/deepagents/backends/utils.py +++ b/src/deepagents/backends/utils.py @@ -1,11 +1,12 @@ """Shared utility functions for memory backend implementations.""" import re -import wcmatch.glob as wcglob from datetime import UTC, datetime from pathlib import Path from typing import Any, Literal +import wcmatch.glob as wcglob + EMPTY_CONTENT_WARNING = "System reminder: File exists but has empty contents" MAX_LINE_LENGTH = 2000 LINE_NUMBER_WIDTH = 6 @@ -18,11 +19,11 @@ def format_content_with_line_numbers( start_line: int = 1, ) -> str: """Format file content with line numbers (cat -n style). - + Args: content: File content as string or list of lines start_line: Starting line number (default: 1) - + Returns: Formatted content with line numbers """ @@ -32,19 +33,16 @@ def format_content_with_line_numbers( lines = lines[:-1] else: lines = content - - return "\n".join( - f"{i + start_line:{LINE_NUMBER_WIDTH}d}\t{line[:MAX_LINE_LENGTH]}" - for i, line in enumerate(lines) - ) + + return "\n".join(f"{i + start_line:{LINE_NUMBER_WIDTH}d}\t{line[:MAX_LINE_LENGTH]}" for i, line in enumerate(lines)) def check_empty_content(content: str) -> str | None: """Check if content is empty and return warning message. - + Args: content: Content to check - + Returns: Warning message if empty, None otherwise """ @@ -55,10 +53,10 @@ def check_empty_content(content: str) -> str | None: def file_data_to_string(file_data: dict[str, Any]) -> str: """Convert FileData to plain string content. - + Args: file_data: FileData dict with 'content' key - + Returns: Content as string with lines joined by newlines """ @@ -67,18 +65,18 @@ def file_data_to_string(file_data: dict[str, Any]) -> str: def create_file_data(content: str, created_at: str | None = None) -> dict[str, Any]: """Create a FileData object with timestamps. - + Args: content: File content as string created_at: Optional creation timestamp (ISO format) - + Returns: FileData dict with content and timestamps """ lines = content.split("\n") if isinstance(content, str) else content - lines = [line[i:i+MAX_LINE_LENGTH] for line in lines for i in range(0, len(line) or 1, MAX_LINE_LENGTH)] + lines = [line[i : i + MAX_LINE_LENGTH] for line in lines for i in range(0, len(line) or 1, MAX_LINE_LENGTH)] now = datetime.now(UTC).isoformat() - + return { "content": lines, "created_at": created_at or now, @@ -88,18 +86,18 @@ def create_file_data(content: str, created_at: str | None = None) -> dict[str, A def update_file_data(file_data: dict[str, Any], content: str) -> dict[str, Any]: """Update FileData with new content, preserving creation timestamp. - + Args: file_data: Existing FileData dict content: New content as string - + Returns: Updated FileData dict """ lines = content.split("\n") if isinstance(content, str) else content - lines = [line[i:i+MAX_LINE_LENGTH] for line in lines for i in range(0, len(line) or 1, MAX_LINE_LENGTH)] + lines = [line[i : i + MAX_LINE_LENGTH] for line in lines for i in range(0, len(line) or 1, MAX_LINE_LENGTH)] now = datetime.now(UTC).isoformat() - + return { "content": lines, "created_at": file_data["created_at"], @@ -113,12 +111,12 @@ def format_read_response( limit: int, ) -> str: """Format file data for read response with line numbers. - + Args: file_data: FileData dict offset: Line offset (0-indexed) limit: Maximum number of lines - + Returns: Formatted content or error message """ @@ -126,14 +124,14 @@ def format_read_response( empty_msg = check_empty_content(content) if empty_msg: return empty_msg - + lines = content.splitlines() start_idx = offset end_idx = min(start_idx + limit, len(lines)) - + if start_idx >= len(lines): return f"Error: Line offset {offset} exceeds file length ({len(lines)} lines)" - + selected_lines = lines[start_idx:end_idx] return format_content_with_line_numbers(selected_lines, start_line=start_idx + 1) @@ -145,24 +143,24 @@ def perform_string_replacement( replace_all: bool, ) -> tuple[str, int] | str: """Perform string replacement with occurrence validation. - + Args: content: Original content old_string: String to replace new_string: Replacement string replace_all: Whether to replace all occurrences - + Returns: Tuple of (new_content, occurrences) on success, or error message string """ occurrences = content.count(old_string) - + if occurrences == 0: return f"Error: String not found in file: '{old_string}'" - + if occurrences > 1 and not replace_all: return f"Error: String '{old_string}' appears {occurrences} times in file. Use replace_all=True to replace all instances, or provide a more specific string with surrounding context." - + new_content = content.replace(old_string, new_string) return new_content, occurrences @@ -174,33 +172,33 @@ def truncate_if_too_long(result: list[str] | str) -> list[str] | str: if total_chars > TOOL_RESULT_TOKEN_LIMIT * 4: return result[: len(result) * TOOL_RESULT_TOKEN_LIMIT * 4 // total_chars] + [TRUNCATION_GUIDANCE] return result - else: # string - if len(result) > TOOL_RESULT_TOKEN_LIMIT * 4: - return result[: TOOL_RESULT_TOKEN_LIMIT * 4] + "\n" + TRUNCATION_GUIDANCE - return result + # string + if len(result) > TOOL_RESULT_TOKEN_LIMIT * 4: + return result[: TOOL_RESULT_TOKEN_LIMIT * 4] + "\n" + TRUNCATION_GUIDANCE + return result def _validate_path(path: str | None) -> str: """Validate and normalize a path. - + Args: path: Path to validate - + Returns: Normalized path starting with / - + Raises: ValueError: If path is invalid """ path = path or "/" if not path or path.strip() == "": raise ValueError("Path cannot be empty") - + normalized = path if path.startswith("/") else "/" + path - + if not normalized.endswith("/"): normalized += "/" - + return normalized @@ -210,16 +208,16 @@ def _glob_search_files( path: str = "/", ) -> str: """Search files dict for paths matching glob pattern. - + Args: files: Dictionary of file paths to FileData. pattern: Glob pattern (e.g., "*.py", "**/*.ts"). path: Base path to search from. - + Returns: Newline-separated file paths, sorted by modification time (most recent first). Returns "No files found" if no matches. - + Example: ```python files = {"/src/main.py": FileData(...), "/test.py": FileData(...)} @@ -256,29 +254,28 @@ def _format_grep_results( output_mode: Literal["files_with_matches", "content", "count"], ) -> str: """Format grep search results based on output mode. - + Args: results: Dictionary mapping file paths to list of (line_num, line_content) tuples output_mode: Output format - "files_with_matches", "content", or "count" - + Returns: Formatted string output """ if output_mode == "files_with_matches": return "\n".join(sorted(results.keys())) - elif output_mode == "count": + if output_mode == "count": lines = [] for file_path in sorted(results.keys()): count = len(results[file_path]) lines.append(f"{file_path}: {count}") return "\n".join(lines) - else: - lines = [] - for file_path in sorted(results.keys()): - lines.append(f"{file_path}:") - for line_num, line in results[file_path]: - lines.append(f" {line_num}: {line}") - return "\n".join(lines) + lines = [] + for file_path in sorted(results.keys()): + lines.append(f"{file_path}:") + for line_num, line in results[file_path]: + lines.append(f" {line_num}: {line}") + return "\n".join(lines) def _grep_search_files( @@ -289,17 +286,17 @@ def _grep_search_files( output_mode: Literal["files_with_matches", "content", "count"] = "files_with_matches", ) -> str: """Search file contents for regex pattern. - + Args: files: Dictionary of file paths to FileData. pattern: Regex pattern to search for. path: Base path to search from. glob: Optional glob pattern to filter files (e.g., "*.py"). output_mode: Output format - "files_with_matches", "content", or "count". - + Returns: Formatted search results. Returns "No matches found" if no results. - + Example: ```python files = {"/file.py": FileData(content=["import os", "print('hi')"], ...)} diff --git a/src/deepagents/graph.py b/src/deepagents/graph.py index 32e9adf1..6655df19 100644 --- a/src/deepagents/graph.py +++ b/src/deepagents/graph.py @@ -17,7 +17,7 @@ from langgraph.store.base import BaseStore from langgraph.types import Checkpointer -from deepagents.backends.protocol import BackendProtocol +from deepagents.backends.protocol import BackendProvider from deepagents.middleware.filesystem import FilesystemMiddleware from deepagents.middleware.patch_tool_calls import PatchToolCallsMiddleware from deepagents.middleware.subagents import CompiledSubAgent, SubAgent, SubAgentMiddleware @@ -48,7 +48,7 @@ def create_deep_agent( context_schema: type[Any] | None = None, checkpointer: Checkpointer | None = None, store: BaseStore | None = None, - backend: BackendProtocol | None = None, + backend: BackendProvider | None = None, interrupt_on: dict[str, bool | InterruptOnConfig] | None = None, debug: bool = False, name: str | None = None, @@ -80,9 +80,10 @@ def create_deep_agent( context_schema: The schema of the deep agent. checkpointer: Optional checkpointer for persisting agent state between runs. store: Optional store for persistent storage (required if backend uses StoreBackend). - backend: Optional backend provider for file storage. Defaults to StateBackendProvider - (ephemeral storage in agent state). For persistent memory, use CompositeStateBackendProvider. - Example: CompositeStateBackendProvider(routes={"/memories/": StoreBackendProvider()}) + backend: Optional backend factory for file storage. A callable that takes ToolRuntime + and returns a Backend instance. Defaults to StateBackendProvider (ephemeral storage + in agent state). For persistent memory with hybrid storage, use CompositeBackendProvider. + Example: CompositeBackendProvider(StateBackendProvider, routes={"/memories/": StoreBackendProvider}) interrupt_on: Optional Dict[str, bool | InterruptOnConfig] mapping tool names to interrupt configs. debug: Whether to enable debug mode. Passed through to create_agent. diff --git a/src/deepagents/middleware/filesystem.py b/src/deepagents/middleware/filesystem.py index fbcebeed..a40e7939 100644 --- a/src/deepagents/middleware/filesystem.py +++ b/src/deepagents/middleware/filesystem.py @@ -1,12 +1,9 @@ """Middleware for providing filesystem tools to an agent.""" # ruff: noqa: E501 -from collections.abc import Awaitable, Callable, Sequence -from typing import Annotated -from typing_extensions import NotRequired - import os -from typing import Literal, Optional +from collections.abc import Awaitable, Callable, Sequence +from typing import Annotated, Literal, NotRequired from langchain.agents.middleware.types import ( AgentMiddleware, @@ -21,9 +18,8 @@ from langgraph.types import Command from typing_extensions import TypedDict -from deepagents.backends.protocol import BackendProtocol, StateBackendProtocol, StateBackendProvider, BackendProvider -from deepagents.backends import StateBackend, CompositeBackend -from deepagents.backends.state import StateBackendProvider +from deepagents.backends.protocol import Backend, BackendProvider +from deepagents.backends.state import StateBackend from deepagents.backends.utils import ( create_file_data, format_content_with_line_numbers, @@ -34,7 +30,6 @@ LINE_NUMBER_WIDTH = 6 DEFAULT_READ_OFFSET = 0 DEFAULT_READ_LIMIT = 2000 -BACKEND_TYPES = BackendProvider | BackendProtocol | StateBackendProtocol | StateBackendProtocol class FileData(TypedDict): @@ -130,6 +125,7 @@ def _validate_path(path: str, *, allowed_prefixes: Sequence[str] | None = None) return normalized + class FilesystemState(AgentState): """State for the filesystem middleware.""" @@ -221,21 +217,29 @@ class FilesystemState(AgentState): - grep: search for text within files""" -def _get_backend(backend: BACKEND_TYPES, runtime: ToolRuntime) -> StateBackendProtocol | BackendProtocol: - if isinstance(backend, (StateBackendProvider, BackendProvider)): - return backend.get_backend(runtime) - else: - return backend +def _get_backend(backend: Backend | BackendProvider, runtime: ToolRuntime) -> Backend: + """Get a backend instance from a Backend or BackendProvider. + + Args: + backend: Either a Backend instance or a BackendProvider callable + runtime: ToolRuntime to pass to the provider if needed + + Returns: + Backend instance + """ + if callable(backend): + return backend(runtime) + return backend def _ls_tool_generator( - backend: BackendProtocol | Callable[[ToolRuntime], BackendProtocol], + backend: Backend | BackendProvider, custom_description: str | None = None, ) -> BaseTool: """Generate the ls (list files) tool. Args: - backend: Backend to use for file storage, or a factory function that takes runtime and returns a backend. + backend: Backend instance or BackendProvider callable custom_description: Optional custom description for the tool. Returns: @@ -254,13 +258,13 @@ def ls(runtime: ToolRuntime[None, FilesystemState], path: str) -> list[str]: def _read_file_tool_generator( - backend: BackendProtocol | Callable[[ToolRuntime], BackendProtocol], + backend: Backend | BackendProvider, custom_description: str | None = None, ) -> BaseTool: """Generate the read_file tool. Args: - backend: Backend to use for file storage, or a factory function that takes runtime and returns a backend. + backend: Backend instance or BackendProvider callable custom_description: Optional custom description for the tool. Returns: @@ -283,13 +287,13 @@ def read_file( def _write_file_tool_generator( - backend: BackendProtocol | Callable[[ToolRuntime], BackendProtocol], + backend: Backend | BackendProvider, custom_description: str | None = None, ) -> BaseTool: """Generate the write_file tool. Args: - backend: Backend to use for file storage, or a factory function that takes runtime and returns a backend. + backend: Backend instance or BackendProvider callable custom_description: Optional custom description for the tool. Returns: @@ -305,19 +309,30 @@ def write_file( ) -> Command | str: resolved_backend = _get_backend(backend, runtime) file_path = _validate_path(file_path) - return resolved_backend.write(file_path, content) + result = resolved_backend.write(file_path, content) + + # Convert WriteResult to Command or string + if result.error: + return result.error + + if result.files_update: + # Checkpoint storage: return Command to update state + return Command(update={"files": result.files_update}) + + # External storage: return success message + return f"Updated file {result.path}" return write_file def _edit_file_tool_generator( - backend: BackendProtocol | Callable[[ToolRuntime], BackendProtocol], + backend: Backend | BackendProvider, custom_description: str | None = None, ) -> BaseTool: """Generate the edit_file tool. Args: - backend: Backend to use for file storage, or a factory function that takes runtime and returns a backend. + backend: Backend instance or BackendProvider callable custom_description: Optional custom description for the tool. Returns: @@ -336,19 +351,30 @@ def edit_file( ) -> Command | str: resolved_backend = _get_backend(backend, runtime) file_path = _validate_path(file_path) - return resolved_backend.edit(file_path, old_string, new_string, replace_all=replace_all) + result = resolved_backend.edit(file_path, old_string, new_string, replace_all=replace_all) + + # Convert EditResult to Command or string + if result.error: + return result.error + + if result.files_update: + # Checkpoint storage: return Command to update state + return Command(update={"files": result.files_update}) + + # External storage: return success message + return f"Successfully replaced {result.occurrences} instance(s) of the string in '{result.path}'" return edit_file def _glob_tool_generator( - backend: BackendProtocol | Callable[[ToolRuntime], BackendProtocol], + backend: Backend | BackendProvider, custom_description: str | None = None, ) -> BaseTool: """Generate the glob tool. Args: - backend: Backend to use for file storage, or a factory function that takes runtime and returns a backend. + backend: Backend instance or BackendProvider callable custom_description: Optional custom description for the tool. Returns: @@ -365,13 +391,13 @@ def glob(pattern: str, runtime: ToolRuntime[None, FilesystemState], path: str = def _grep_tool_generator( - backend: BackendProtocol | Callable[[ToolRuntime], BackendProtocol], + backend: Backend | BackendProvider, custom_description: str | None = None, ) -> BaseTool: """Generate the grep tool. Args: - backend: Backend to use for file storage, or a factory function that takes runtime and returns a backend. + backend: Backend instance or BackendProvider callable custom_description: Optional custom description for the tool. Returns: @@ -383,7 +409,7 @@ def _grep_tool_generator( def grep( pattern: str, runtime: ToolRuntime[None, FilesystemState], - path: Optional[str] = None, + path: str | None = None, glob: str | None = None, output_mode: Literal["files_with_matches", "content", "count"] = "files_with_matches", ) -> str: @@ -404,13 +430,13 @@ def grep( def _get_filesystem_tools( - backend: BackendProtocol, + backend: Backend | BackendProvider, custom_tool_descriptions: dict[str, str] | None = None, ) -> list[BaseTool]: """Get filesystem tools. Args: - backend: Backend to use for file storage, or a factory function that takes runtime and returns a backend. + backend: Backend instance or BackendProvider callable custom_tool_descriptions: Optional custom descriptions for tools. Returns: @@ -438,41 +464,32 @@ def _get_filesystem_tools( class FilesystemMiddleware(AgentMiddleware): """Middleware for providing filesystem tools to an agent. - This middleware adds six filesystem tools to the agent: ls, read_file, write_file, - edit_file, glob, and grep. Files can be stored using any backend that implements - the BackendProtocol. + This middleware adds six filesystem tools to the agent: ls, read_file, write_file, + edit_file, glob, and grep. Files can be stored using any backend that implements + the BackendProtocol. Args: -<<<<<<< HEAD - memory_backend: Backend for file storage. If not provided, defaults to StateBackend - (ephemeral storage in agent state). For persistent storage or hybrid setups, - use CompositeBackend with custom routes. -======= - long_term_memory: Whether to enable longterm memory support. -<<<<<<< HEAD ->>>>>>> master -======= ->>>>>>> master - system_prompt: Optional custom system prompt override. - custom_tool_descriptions: Optional custom tool descriptions override. - tool_token_limit_before_evict: Optional token limit before evicting a tool result to the filesystem. + memory_backend: Backend for file storage. If not provided, defaults to StateBackend + (ephemeral storage in agent state). For persistent storage or hybrid setups, + use CompositeBackend with custom routes. + long_term_memory: Whether to enable longterm memory support. + system_prompt: Optional custom system prompt override. + custom_tool_descriptions: Optional custom tool descriptions override. + tool_token_limit_before_evict: Optional token limit before evicting a tool result to the filesystem. Example: - ```python - from deepagents.middleware.filesystem import FilesystemMiddleware - from deepagents.memory.backends import StateBackend, StoreBackend, CompositeBackend - from langchain.agents import create_agent - - # Ephemeral storage only (default) - agent = create_agent(middleware=[FilesystemMiddleware()]) - - # With hybrid storage (ephemeral + persistent /memories/) - backend = CompositeBackend( - default=StateBackend(), - routes={"/memories/": StoreBackend()} - ) - agent = create_agent(middleware=[FilesystemMiddleware(memory_backend=backend)]) - ``` + ```python + from deepagents.middleware.filesystem import FilesystemMiddleware + from deepagents.memory.backends import StateBackend, StoreBackend, CompositeBackend + from langchain.agents import create_agent + + # Ephemeral storage only (default) + agent = create_agent(middleware=[FilesystemMiddleware()]) + + # With hybrid storage (ephemeral + persistent /memories/) + backend = CompositeBackend(default=StateBackend(), routes={"/memories/": StoreBackend()}) + agent = create_agent(middleware=[FilesystemMiddleware(memory_backend=backend)]) + ``` """ state_schema = FilesystemState @@ -480,7 +497,7 @@ class FilesystemMiddleware(AgentMiddleware): def __init__( self, *, - backend: BACKEND_TYPES | None = None, + backend: Backend | BackendProvider | None = None, system_prompt: str | None = None, custom_tool_descriptions: dict[str, str] | None = None, tool_token_limit_before_evict: int | None = 20000, @@ -488,15 +505,18 @@ def __init__( """Initialize the filesystem middleware. Args: - memory_backend: Backend for file storage. Defaults to StateBackend if not provided. + backend: Backend instance or BackendProvider callable. Defaults to StateBackendProvider if not provided. system_prompt: Optional custom system prompt override. custom_tool_descriptions: Optional custom tool descriptions override. tool_token_limit_before_evict: Optional token limit before evicting a tool result to the filesystem. """ self.tool_token_limit_before_evict = tool_token_limit_before_evict - # Use provided backend or default to StateBackend factory - self.backend = backend if backend is not None else StateBackendProvider() + def _get_state_backend(runtime: ToolRuntime) -> Backend: + return StateBackend(runtime) + + # Use provided backend or default to StateBackendProvider function + self.backend = backend if backend is not None else _get_state_backend # Set system prompt (allow full override) self.system_prompt = system_prompt if system_prompt is not None else FILESYSTEM_SYSTEM_PROMPT @@ -627,4 +647,4 @@ async def awrap_tool_call( return await handler(request) tool_result = await handler(request) - return self._intercept_large_tool_result(tool_result) \ No newline at end of file + return self._intercept_large_tool_result(tool_result)