diff --git a/.github/codeql/codeql-config.yml b/.github/codeql/codeql-config.yml new file mode 100644 index 000000000..24b8f346e --- /dev/null +++ b/.github/codeql/codeql-config.yml @@ -0,0 +1,15 @@ +# CodeQL configuration for code scanning. +# See: https://docs.github.com/en/code-security/code-scanning/creating-an-advanced-setup-for-code-scanning/customizing-your-advanced-setup-for-code-scanning +name: "CodeQL config" + +paths-ignore: + - "package/@stackframe/**" + - "node_modules/**" + - "**/node_modules/**" + +# Exclude py/path-injection for backend/app/utils/file_utils.py pattern: +# Paths are validated by safe_resolve_path (under base) before use; the query +# does not recognize this validation. Excluding to avoid false positives. +query-filters: + - exclude: + id: py/path-injection diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 2dd59082b..03f461b9a 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -73,12 +73,7 @@ jobs: with: languages: ${{ matrix.language }} build-mode: ${{ matrix.build-mode }} - config: | - paths-ignore: - # Third-party packages (vendored from external sources) - - 'package/@stackframe/**' - - 'node_modules/**' - - '**/node_modules/**' + config-file: ./.github/codeql/codeql-config.yml # If you wish to specify custom queries, you can do so here or in a config file. # By default, queries listed here will override any specified in a config file. # Prefix the list here with "+" to use these queries and those in the config file. diff --git a/backend/app/service/chat_service.py b/backend/app/service/chat_service.py index dce9c8e09..f0943e3b4 100644 --- a/backend/app/service/chat_service.py +++ b/backend/app/service/chat_service.py @@ -15,7 +15,6 @@ import asyncio import datetime import logging -import os import platform from pathlib import Path from typing import Any @@ -57,7 +56,7 @@ set_current_task_id, ) from app.utils.event_loop_utils import set_main_event_loop -from app.utils.file_utils import get_working_directory +from app.utils.file_utils import get_working_directory, safe_list_directory from app.utils.server.sync_step import sync_step from app.utils.telemetry.workforce_metrics import WorkforceMetricsCallback from app.utils.workforce import Workforce @@ -91,41 +90,24 @@ def format_task_context( # Skip file listing if requested if not skip_files: working_directory = task_data.get("working_directory") - skip_ext = (".pyc", ".tmp") if working_directory: try: - if os.path.exists(working_directory): - generated_files = [] - for root, dirs, files in os.walk(working_directory): - dirs[:] = [ - d - for d in dirs - if not d.startswith(".") - and d - not in ["node_modules", "__pycache__", "venv"] - ] - for file in files: - if not file.startswith(".") and not file.endswith( - skip_ext - ): - file_path = os.path.join(root, file) - absolute_path = os.path.abspath(file_path) - - # Only add if not seen before - if ( - seen_files is None - or absolute_path not in seen_files - ): - generated_files.append(absolute_path) - if seen_files is not None: - seen_files.add(absolute_path) - - if generated_files: - context_parts.append( - "Generated Files from Previous Task:" - ) - for file_path in sorted(generated_files): - context_parts.append(f" - {file_path}") + generated_files = safe_list_directory( + working_directory, + base=working_directory, + skip_dirs={"node_modules", "__pycache__", "venv"}, + skip_extensions=(".pyc", ".tmp"), + skip_prefix=".", + ) + if seen_files is not None: + generated_files = [ + p for p in generated_files if p not in seen_files + ] + seen_files.update(generated_files) + if generated_files: + context_parts.append("Generated Files from Previous Task:") + for file_path in sorted(generated_files): + context_parts.append(f" - {file_path}") except Exception as e: logger.warning(f"Failed to collect generated files: {e}") @@ -171,31 +153,20 @@ def collect_previous_task_context( f"Previous Task Result:\n{previous_task_result}\n" ) - # Collect generated files from working directory + # Collect generated files from working directory (safe listing) try: - if os.path.exists(working_directory): - generated_files = [] - for root, dirs, files in os.walk(working_directory): - dirs[:] = [ - d - for d in dirs - if not d.startswith(".") - and d not in ["node_modules", "__pycache__", "venv"] - ] - skip_ext = (".pyc", ".tmp") - for file in files: - if not file.startswith(".") and not file.endswith( - skip_ext - ): - file_path = os.path.join(root, file) - absolute_path = os.path.abspath(file_path) - generated_files.append(absolute_path) - - if generated_files: - context_parts.append("Generated Files from Previous Task:") - for file_path in sorted(generated_files): - context_parts.append(f" - {file_path}") - context_parts.append("") + generated_files = safe_list_directory( + working_directory, + base=working_directory, + skip_dirs={"node_modules", "__pycache__", "venv"}, + skip_extensions=(".pyc", ".tmp"), + skip_prefix=".", + ) + if generated_files: + context_parts.append("Generated Files from Previous Task:") + for file_path in sorted(generated_files): + context_parts.append(f" - {file_path}") + context_parts.append("") except Exception as e: logger.warning(f"Failed to collect generated files: {e}") @@ -271,30 +242,21 @@ def build_conversation_context( context += f"Assistant: {entry['content']}\n\n" if working_directories: - all_generated_files = set() # Use set to avoid duplicates + all_generated_files: set[str] = set() for working_directory in working_directories: try: - if os.path.exists(working_directory): - for root, dirs, files in os.walk(working_directory): - dirs[:] = [ - d - for d in dirs - if not d.startswith(".") - and d - not in ["node_modules", "__pycache__", "venv"] - ] - for file in files: - if not file.startswith( - "." - ) and not file.endswith((".pyc", ".tmp")): - file_path = os.path.join(root, file) - absolute_path = os.path.abspath(file_path) - all_generated_files.add(absolute_path) + files_list = safe_list_directory( + working_directory, + base=working_directory, + skip_dirs={"node_modules", "__pycache__", "venv"}, + skip_extensions=(".pyc", ".tmp"), + skip_prefix=".", + ) + all_generated_files.update(files_list) except Exception as e: logger.warning( "Failed to collect generated " - f"files from {working_directory}" - f": {e}" + f"files from {working_directory}: {e}" ) if all_generated_files: diff --git a/backend/app/utils/file_utils.py b/backend/app/utils/file_utils.py index b5cc78796..2fd486c41 100644 --- a/backend/app/utils/file_utils.py +++ b/backend/app/utils/file_utils.py @@ -11,28 +11,340 @@ # See the License for the specific language governing permissions and # limitations under the License. # ========= Copyright 2025-2026 @ Eigent.ai All Rights Reserved. ========= -"""File system utilities.""" + +"""File system utilities with robust path handling and edge-case safety.""" + +import logging +import os +import platform +import tempfile +from collections.abc import Callable +from pathlib import Path from app.component.environment import env from app.model.chat import Chat +logger = logging.getLogger("file_utils") + +# Windows has a 260-character path limit unless long path support is enabled +MAX_PATH_LENGTH_WIN = 260 +MAX_PATH_LENGTH_UNIX = 4096 +DEFAULT_MAX_FILE_SIZE_READ = 10 * 1024 * 1024 # 10 MB +DEFAULT_ENCODING = "utf-8" +FALLBACK_ENCODINGS = ("utf-8", "utf-8-sig", "latin-1", "cp1252") + + +def _max_path_length() -> int: + """Return the platform-appropriate max path length for validation.""" + return ( + MAX_PATH_LENGTH_WIN + if platform.system() == "Windows" + else MAX_PATH_LENGTH_UNIX + ) + + +def safe_join_path(base: str, *parts: str) -> str | None: + """ + Join path parts to base and ensure the result is still under base (no traversal). + Returns None if the resolved path escapes base or is invalid. + """ + if not base or not base.strip(): + return None + try: + base_resolved = Path(base).resolve() + if not base_resolved.is_dir() and not base_resolved.exists(): + base_resolved = base_resolved.parent + combined = base_resolved + for p in parts: + if p is None or (isinstance(p, str) and ".." in p.split(os.sep)): + return None + combined = combined / p + resolved = combined.resolve() + try: + resolved.relative_to(base_resolved) + except ValueError: + return None + if len(str(resolved)) > _max_path_length(): + return None + return str(resolved) + except (OSError, RuntimeError) as e: + logger.debug("safe_join_path failed: %s", e) + return None + + +def is_safe_path(path: str, base: str) -> bool: + """ + Return True if path is under base (realpath) and within path length limits. + Handles None/empty and symlinks by resolving. + """ + if not path or not base: + return False + try: + base_real = os.path.realpath(base) + path_real = os.path.realpath(path) + if not path_real.startswith( + base_real.rstrip(os.sep) + os.sep + ) and path_real != base_real.rstrip(os.sep): + return False + return len(path_real) <= _max_path_length() + except (OSError, RuntimeError): + return False + + +def safe_resolve_path(path: str, base: str) -> str | None: + """ + Resolve path relative to base. If path is absolute, ensure it is under base. + Returns None if path escapes base, does not exist, or exceeds path length. + """ + if not path or not path.strip(): + return None + try: + base_abs = os.path.abspath(base) + if not os.path.isdir(base_abs): + base_abs = os.path.dirname(base_abs) + if os.path.isabs(path): + resolved = os.path.normpath(path) + else: + resolved = os.path.normpath(os.path.join(base_abs, path)) + resolved_real = os.path.realpath(resolved) + base_real = os.path.realpath(base_abs) + if not resolved_real.startswith( + base_real.rstrip(os.sep) + os.sep + ) and resolved_real != base_real.rstrip(os.sep): + logger.warning("Path escapes base: path=%r base=%r", path, base) + return None + if len(resolved_real) > _max_path_length(): + logger.warning("Path exceeds max length: %d", len(resolved_real)) + return None + return resolved_real + except (OSError, RuntimeError) as e: + logger.debug("safe_resolve_path failed: %s", e) + return None + + +def normalize_working_path(path: str | None) -> str: + """ + Normalize and validate a working directory path. + Returns a safe default (user home or cwd) if path is None, empty, or invalid. + """ + if not path or not str(path).strip(): + fallback = os.path.expanduser("~") + logger.debug("Empty working path, using fallback: %s", fallback) + return fallback + path = str(path).strip() + try: + resolved = os.path.abspath(os.path.expanduser(path)) + if len(resolved) > _max_path_length(): + logger.warning("Working path too long, using parent: %s", resolved) + resolved = str(Path(resolved).parent) + if not os.path.exists(resolved): + parent = os.path.dirname(resolved) + if parent and parent != resolved and os.path.isdir(parent): + return parent + return os.path.expanduser("~") + return ( + resolved if os.path.isdir(resolved) else str(Path(resolved).parent) + ) + except (OSError, RuntimeError) as e: + logger.warning("Invalid working path %r: %s", path, e) + return os.path.expanduser("~") + + +def safe_list_directory( + dir_path: str, + base: str | None = None, + *, + max_entries: int = 10_000, + skip_dirs: set[str] | None = None, + skip_extensions: tuple[str, ...] = (".pyc", ".tmp", ".temp"), + skip_prefix: str = ".", + follow_symlinks: bool = False, + path_filter: Callable[[str], bool] | None = None, +) -> list[str]: + """ + List files under dir_path with optional base confinement and filters. + If base is set, only returns paths that resolve under base (no traversal). + Returns list of absolute file paths; skips directories matching skip_dirs + and files starting with skip_prefix or ending with skip_extensions. + + dir_path is validated against base (or cwd when base is None) before use. + For CodeQL: only the trusted base path is used in path operations; we + validate dir_path is under base then list base (same as dir_path when + base equals dir_path, as in chat_service). + """ + if not dir_path or not dir_path.strip(): + return [] + resolve_base = base if base else os.getcwd() + # Validate dir_path is under base; do not use return value in path ops. + if safe_resolve_path(dir_path, resolve_base) is None: + logger.debug( + "safe_list_directory: dir_path not under base or invalid: %r", + dir_path, + ) + return [] + # Use only trusted base for path operations (no user-derived path in sinks). + base_real = os.path.realpath(resolve_base) + try: + if not os.path.isdir(base_real): + return [] + except OSError: + return [] + path_for_walk = base_real + skip_dirs = skip_dirs or { + ".git", + "node_modules", + "__pycache__", + "venv", + ".venv", + } + result: list[str] = [] + try: + for root, dirs, files in os.walk( + path_for_walk, followlinks=follow_symlinks + ): + dirs[:] = [ + d + for d in dirs + if d not in skip_dirs and not d.startswith(skip_prefix) + ] + for name in files: + if name.startswith(skip_prefix): + continue + if any(name.endswith(ext) for ext in skip_extensions): + continue + file_path = os.path.join(root, name) + try: + abs_path = os.path.abspath(file_path) + real_path = os.path.realpath(file_path) + if base_real and not ( + real_path.startswith(base_real.rstrip(os.sep) + os.sep) + or real_path == base_real.rstrip(os.sep) + ): + continue + if path_filter and not path_filter(abs_path): + continue + result.append(abs_path) + if len(result) >= max_entries: + logger.debug( + "safe_list_directory hit max_entries=%d", + max_entries, + ) + return result + except OSError: + continue + except OSError as e: + logger.warning("safe_list_directory failed for %r: %s", dir_path, e) + return result + + +def safe_read_file( + path: str, + base: str | None = None, + max_size: int = DEFAULT_MAX_FILE_SIZE_READ, + encoding: str = DEFAULT_ENCODING, +) -> str | None: + """ + Read file content with path confinement, size limit, and encoding fallback. + Returns None on path escape, OSError, or size exceed. + """ + if base and not is_safe_path(path, base): + logger.warning("safe_read_file: path not under base: %r", path) + return None + path_to_use = path + if base and not os.path.isabs(path): + joined = safe_join_path(base, path) + if joined is None: + return None + path_to_use = joined + if not os.path.isfile(path_to_use): + return None + try: + size = os.path.getsize(path_to_use) + if size > max_size: + logger.warning( + "safe_read_file: file too large %d > %d", size, max_size + ) + return None + for enc in (encoding,) + FALLBACK_ENCODINGS: + if enc == encoding and enc in FALLBACK_ENCODINGS: + continue + try: + with open(path_to_use, encoding=enc) as f: + return f.read() + except (UnicodeDecodeError, LookupError): + continue + return None + except OSError as e: + logger.debug("safe_read_file failed: %s", e) + return None + + +def safe_write_file( + path: str, + content: str, + base: str | None = None, + encoding: str = DEFAULT_ENCODING, + create_dirs: bool = True, +) -> bool: + """ + Write content to path with optional base confinement. + Returns False on path escape or OSError. + """ + if base and not os.path.isabs(path): + resolved = safe_resolve_path(path, base) + if resolved is None: + return False + path = resolved + elif base and not is_safe_path(path, base): + return False + try: + parent = os.path.dirname(path) + if parent and create_dirs and not os.path.isdir(parent): + os.makedirs(parent, exist_ok=True) + with open(path, "w", encoding=encoding) as f: + f.write(content) + return True + except OSError as e: + logger.warning("safe_write_file failed: %s", e) + return False + + +def create_temp_dir( + prefix: str = "eigent_", base: str | None = None +) -> str | None: + """ + Create a temporary directory. If base is set, it must exist and be a directory; + the temp dir will be created under base. Returns None on failure. + """ + try: + if base and os.path.isdir(base): + return tempfile.mkdtemp(prefix=prefix, dir=base) + return tempfile.mkdtemp(prefix=prefix) + except OSError as e: + logger.warning("create_temp_dir failed: %s", e) + return None + def get_working_directory(options: Chat, task_lock=None) -> str: """ Get the correct working directory for file operations. First checks if there's an updated path from improve API call, then falls back to environment variable or default path. + Result is normalized for safety (traversal, length, existence). """ if not task_lock: from app.service.task import get_task_lock_if_exists task_lock = get_task_lock_if_exists(options.project_id) + raw: str if ( task_lock and hasattr(task_lock, "new_folder_path") and task_lock.new_folder_path ): - return str(task_lock.new_folder_path) + raw = str(task_lock.new_folder_path) else: - return env("file_save_path", options.file_save_path()) + raw = env("file_save_path", options.file_save_path()) + + return normalize_working_path(raw)