diff --git a/desloppify/languages/_framework/generic_support/core.py b/desloppify/languages/_framework/generic_support/core.py index 1cb66154d..90eb46409 100644 --- a/desloppify/languages/_framework/generic_support/core.py +++ b/desloppify/languages/_framework/generic_support/core.py @@ -10,7 +10,7 @@ from typing import Any from desloppify.engine.policy.zones import ZoneRule -from desloppify.languages._framework.base.types import LangConfig +from desloppify.languages._framework.base.types import DetectorPhase, LangConfig from .capabilities import ( SHARED_PHASE_LABELS, capability_report, @@ -60,6 +60,7 @@ def generic_lang( external_test_dirs: list[str] | None = None, test_file_extensions: list[str] | None = None, frameworks: bool = False, + custom_phases: list[DetectorPhase] | None = None, ) -> LangConfig: """Build and register a generic language plugin from tool specs. @@ -86,6 +87,7 @@ def generic_lang( entry_patterns=entry_patterns, external_test_dirs=external_test_dirs, test_file_extensions=test_file_extensions, + custom_phases=custom_phases, ) from desloppify.languages import register_generic_lang @@ -102,6 +104,7 @@ def generic_lang( has_treesitter=has_treesitter, extract_fn=extract_fn, dep_graph_fn=dep_graph_fn, + custom_phases=opts.custom_phases, ) cfg = LangConfig( diff --git a/desloppify/languages/_framework/generic_support/registration.py b/desloppify/languages/_framework/generic_support/registration.py index 85c6d3d0f..0faa74ce6 100644 --- a/desloppify/languages/_framework/generic_support/registration.py +++ b/desloppify/languages/_framework/generic_support/registration.py @@ -41,6 +41,7 @@ class GenericLangOptions: entry_patterns: list[str] | None = None external_test_dirs: list[str] | None = None test_file_extensions: list[str] | None = None + custom_phases: list[DetectorPhase] | None = None def _register_generic_tool_specs(tool_specs: list[dict[str, Any]]) -> dict[str, FixerConfig]: @@ -106,6 +107,7 @@ def _build_generic_phases( has_treesitter: bool, extract_fn, dep_graph_fn, + custom_phases: list[DetectorPhase] | None = None, ) -> list[DetectorPhase]: from desloppify.languages._framework.base.phase_builders import ( detector_phase_security, @@ -141,6 +143,9 @@ def _build_generic_phases( phases.append(_make_coupling_phase(dep_graph_fn)) phases.append(detector_phase_test_coverage()) + if custom_phases: + phases.extend(custom_phases) + phases.extend(shared_subjective_duplicates_tail()) return phases diff --git a/desloppify/languages/r/__init__.py b/desloppify/languages/r/__init__.py index 3a28cf1ed..3a9bf2082 100644 --- a/desloppify/languages/r/__init__.py +++ b/desloppify/languages/r/__init__.py @@ -1,7 +1,9 @@ -"""R language plugin — Jarl, lintr + tree-sitter.""" +"""R language plugin — Jarl, lintr + tree-sitter + R-specific smells.""" +from desloppify.languages._framework.base.types import DetectorPhase from desloppify.languages._framework.generic_support.core import generic_lang from desloppify.languages._framework.treesitter import R_SPEC +from desloppify.languages.r.phases_smells import phase_smells generic_lang( name="r", @@ -31,6 +33,9 @@ detect_markers=["DESCRIPTION", ".Rproj"], default_src="R", treesitter_spec=R_SPEC, + custom_phases=[ + DetectorPhase("R code smells", phase_smells), + ], ) __all__ = [ diff --git a/desloppify/languages/r/detectors/__init__.py b/desloppify/languages/r/detectors/__init__.py new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/desloppify/languages/r/detectors/__init__.py @@ -0,0 +1 @@ + diff --git a/desloppify/languages/r/detectors/smells.py b/desloppify/languages/r/detectors/smells.py new file mode 100644 index 000000000..69dbca314 --- /dev/null +++ b/desloppify/languages/r/detectors/smells.py @@ -0,0 +1,344 @@ +"""R code smell detection.""" + +from __future__ import annotations + +import logging +import re +from pathlib import Path + +from desloppify.base.discovery.source import find_source_files +from .smells_catalog import R_SMELL_CHECKS, SEVERITY_ORDER + +logger = logging.getLogger(__name__) + +_R_STRING_RE = re.compile(r'"(?:[^"\\]|\\.)*"|\'(?:[^\'\\]|\\.)*\'') +_R_COMMENT_RE = re.compile(r"#[^\n]*") +_FUNCTION_DEF_RE = re.compile( + r"(?m)^\s*\w+\s*<-\s*function\s*\(", +) +_LIBRARY_IN_FN_RE = re.compile( + r"(? str: + """Remove R comments while preserving string literals. + + Replaces string contents with placeholders, strips comments, then restores strings. + """ + strings: list[str] = [] + + def replace_string(match: re.Match) -> str: + strings.append(match.group(0)) + return f"__STRING_{len(strings) - 1}__" + + content = _R_STRING_RE.sub(replace_string, content) + content = _R_COMMENT_RE.sub("", content) + + for i, s in enumerate(strings): + content = content.replace(f"__STRING_{i}__", s) + + return content + + +def _line_number(content: str, offset: int) -> int: + """Convert character offset to 1-based line number.""" + return content[:offset].count("\n") + 1 + + +def _line_preview(content: str, line_number: int) -> str: + """Get stripped preview of a specific line.""" + lines = content.splitlines() + if 1 <= line_number <= len(lines): + return lines[line_number - 1].strip()[:100] + return "" + + +def detect_smells(path: Path) -> tuple[list[dict], int]: + """Detect R-specific code smell patterns across source files. + + Returns (entries, total_files_checked). + """ + smell_counts: dict[str, list[dict]] = { + check["id"]: [] for check in R_SMELL_CHECKS + } + total_files = 0 + + for filepath in _find_r_files(path): + total_files += 1 + content = _read_file(filepath) + if content is None: + continue + + stripped = _strip_r_comments(content) + _scan_pattern_smells(filepath, content, stripped, smell_counts) + _detect_library_in_function(filepath, content, stripped, smell_counts) + _detect_lowercase_extension(filepath, content, stripped, smell_counts) + _detect_unnecessary_return(filepath, content, stripped, smell_counts) + + entries: list[dict] = [] + for check in R_SMELL_CHECKS: + matches = smell_counts[check["id"]] + if not matches: + continue + entries.append( + { + "id": check["id"], + "label": check["label"], + "severity": check["severity"], + "count": len(matches), + "files": len({m["file"] for m in matches}), + "matches": matches[:50], + } + ) + entries.sort( + key=lambda e: (SEVERITY_ORDER.get(e["severity"], 9), -e["count"]) + ) + return entries, total_files + + +def _scan_pattern_smells( + filepath: str, + raw_content: str, + stripped_content: str, + smell_counts: dict[str, list[dict]], +) -> None: + """Run regex-based smell checks for one file.""" + for check in R_SMELL_CHECKS: + pattern = check.get("pattern") + if pattern is None: + continue + for match in re.finditer(pattern, stripped_content): + line = _line_number(stripped_content, match.start()) + smell_counts[check["id"]].append( + { + "file": filepath, + "line": line, + "content": _line_preview(raw_content, line), + } + ) + + +def _detect_library_in_function( + filepath: str, + raw_content: str, + stripped_content: str, + smell_counts: dict[str, list[dict]], +) -> None: + """Detect library()/require() calls inside function bodies using tree-sitter.""" + if _LIBRARY_IN_FN_CHECK_ID not in smell_counts: + return + + if not _TS_AVAILABLE: + _detect_library_in_function_fallback(filepath, raw_content, stripped_content, smell_counts) + return + + try: + parser, language = _get_parser(R_SPEC.grammar) + cached = get_or_parse_tree(filepath, parser, R_SPEC.grammar) + if cached is None: + return + source, tree = cached + + # R function query captures function body as @body + from tree_sitter import Query, QueryCursor + fn_query = Query(language, R_SPEC.function_query) + cursor = QueryCursor(fn_query) + fn_matches = cursor.matches(tree.root_node) + + # Track seen lines to avoid duplicates in nested functions + seen_lines: set[int] = set() + + for _pattern_idx, captures in fn_matches: + body_node = captures.get("body") + if not body_node: + continue + body_node = body_node[0] if isinstance(body_node, list) else body_node + + # Walk the function body looking for library/require calls + _find_library_calls_in_node(body_node, source, filepath, raw_content, smell_counts, seen_lines) + + except Exception as exc: + logger.debug("tree-sitter parse failed for %s: %s", filepath, exc) + _detect_library_in_function_fallback(filepath, raw_content, stripped_content, smell_counts) + + +def _find_library_calls_in_node(node, source: bytes, filepath: str, raw_content: str, smell_counts: dict[str, list[dict]], seen_lines: set[int]) -> None: + """Recursively find library()/require() calls inside a node.""" + stack = [node] + while stack: + current = stack.pop() + + # Check if this is a call to library() or require() + if current.type == "call": + fn_child = current.child_by_field_name("function") + if fn_child and fn_child.type == "identifier": + fn_name = source[fn_child.start_byte:fn_child.end_byte] + if isinstance(fn_name, bytes): + fn_name = fn_name.decode("utf-8", errors="replace") + if fn_name in ("library", "require"): + line = current.start_point[0] + 1 + # Deduplicate by line number (nested functions can cause duplicates) + if line not in seen_lines: + seen_lines.add(line) + smell_counts[_LIBRARY_IN_FN_CHECK_ID].append( + { + "file": filepath, + "line": line, + "content": _line_preview(raw_content, line), + } + ) + + # Recurse into children + for child in current.children: + stack.append(child) + + +def _detect_library_in_function_fallback( + filepath: str, + raw_content: str, + stripped_content: str, + smell_counts: dict[str, list[dict]], +) -> None: + """Fallback regex-based detection when tree-sitter is unavailable. + + Uses a simple heuristic: track function definitions and their brace depth. + Only braces that appear after 'function(' on the same line or shortly after + are considered function body braces. + """ + # Join content to handle multi-line function definitions + content = stripped_content + + # Find all function definitions and their brace scopes + fn_ranges: list[tuple[int, int]] = [] # (start_pos, end_pos) in content + + for match in _FUNCTION_DEF_RE.finditer(content): + start = match.start() + # Find the opening brace of the function body + brace_start = content.find("{", start) + if brace_start == -1: + continue + + # Find matching closing brace + depth = 1 + pos = brace_start + 1 + while pos < len(content) and depth > 0: + if content[pos] == "{": + depth += 1 + elif content[pos] == "}": + depth -= 1 + pos += 1 + + fn_ranges.append((brace_start, pos)) + + # Check each library/require call to see if it's inside a function + for match in _LIBRARY_IN_FN_RE.finditer(content): + pos = match.start() + for start, end in fn_ranges: + if start < pos < end: + line = content[:pos].count("\n") + 1 + smell_counts[_LIBRARY_IN_FN_CHECK_ID].append( + { + "file": filepath, + "line": line, + "content": _line_preview(raw_content, line), + } + ) + break + + +def _detect_lowercase_extension( + filepath: str, + _raw_content: str, + _stripped_content: str, + smell_counts: dict[str, list[dict]], +) -> None: + """Detect .r or .q file extensions (CRAN requires .R).""" + if _LOWERCASE_EXT_CHECK_ID not in smell_counts: + return + + if filepath.endswith(".r") or filepath.endswith(".q"): + smell_counts[_LOWERCASE_EXT_CHECK_ID].append( + { + "file": filepath, + "line": 0, + "content": filepath, + } + ) + + +def _detect_unnecessary_return( + filepath: str, + raw_content: str, + stripped_content: str, + smell_counts: dict[str, list[dict]], +) -> None: + """Detect unnecessary return() at end of functions.""" + if _UNNECESSARY_RETURN_CHECK_ID not in smell_counts: + return + + lines = stripped_content.splitlines() + # Track function depth and last non-empty line of each function + fn_depth = 0 + last_non_empty: dict[int, int] = {} # depth -> line number + + for i, line in enumerate(lines): + stripped = line.strip() + opens = line.count("{") - line.count("}") + + if opens > 0: + for _ in range(opens): + fn_depth += 1 + last_non_empty[fn_depth] = -1 + elif opens < 0: + for _ in range(-opens): + if fn_depth > 0: + # Check if last non-empty line was return() + last_line = last_non_empty.get(fn_depth, -1) + if last_line >= 0: + last_stripped = lines[last_line].strip() + if last_stripped.startswith("return(") and last_stripped.endswith(")"): + smell_counts[_UNNECESSARY_RETURN_CHECK_ID].append( + { + "file": filepath, + "line": last_line + 1, + "content": _line_preview(raw_content, last_line + 1), + } + ) + del last_non_empty[fn_depth] + fn_depth = max(0, fn_depth - 1) + + if stripped and fn_depth > 0: + last_non_empty[fn_depth] = i + + +def _find_r_files(path: Path) -> list[str]: + """Find R source files using the framework's discovery system. + + Respects project-configured exclusion patterns. + """ + return find_source_files(str(path), [".R", ".r"]) + + +def _read_file(filepath: str) -> str | None: + """Read file content, returning None on error.""" + try: + return Path(filepath).read_text(encoding="utf-8") + except (OSError, UnicodeDecodeError): + return None + + +__all__ = ["detect_smells"] diff --git a/desloppify/languages/r/detectors/smells_catalog.py b/desloppify/languages/r/detectors/smells_catalog.py new file mode 100644 index 000000000..daee8eee0 --- /dev/null +++ b/desloppify/languages/r/detectors/smells_catalog.py @@ -0,0 +1,70 @@ +"""Catalog metadata for R smell checks.""" + +from __future__ import annotations + +R_SMELL_CHECKS = [ + { + "id": "setwd", + "label": "setwd() — non-portable working directory change", + "pattern": r"(? tuple[list, dict[str, int]]: + """Run R-specific smell detectors and normalize them into issues.""" + smell_entries, total_smell_files = detect_smells(path) + normalized_smells = normalize_smell_entries(smell_entries) + results = make_smell_issues( + [entry.to_mapping() for entry in normalized_smells], + log, + ) + return results, {"smells": total_smell_files} + + +__all__ = ["phase_smells"] diff --git a/desloppify/languages/r/tests/__init__.py b/desloppify/languages/r/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/desloppify/languages/r/tests/test_r_smells.py b/desloppify/languages/r/tests/test_r_smells.py new file mode 100644 index 000000000..fce1e41dc --- /dev/null +++ b/desloppify/languages/r/tests/test_r_smells.py @@ -0,0 +1,215 @@ +"""Tests for R-specific code smell detectors.""" + +from __future__ import annotations + +from pathlib import Path + +from desloppify.languages.r.detectors.smells import detect_smells + + +def _write(path: Path, rel_path: str, content: str) -> Path: + target = path / rel_path + target.parent.mkdir(parents=True, exist_ok=True) + target.write_text(content) + return target + + +def _entry(entries: list[dict], smell_id: str) -> dict: + return next(entry for entry in entries if entry["id"] == smell_id) + + +def test_detect_setwd(tmp_path): + _write(tmp_path, "R/script.R", 'setwd("/some/path")\n') + entries, total_files = detect_smells(tmp_path) + assert total_files == 1 + smell = _entry(entries, "setwd") + assert smell["count"] == 1 + + +def test_detect_global_assign(tmp_path): + _write(tmp_path, "R/script.R", "counter <<- 1\n") + entries, _ = detect_smells(tmp_path) + smell = _entry(entries, "global_assign") + assert smell["count"] == 1 + + +def test_detect_attach(tmp_path): + _write(tmp_path, "R/script.R", "attach(mtcars)\n") + entries, _ = detect_smells(tmp_path) + smell = _entry(entries, "attach") + assert smell["count"] == 1 + + +def test_detect_dangerous_rm(tmp_path): + _write(tmp_path, "R/script.R", "rm(list = ls())\n") + entries, _ = detect_smells(tmp_path) + smell = _entry(entries, "dangerous_rm") + assert smell["count"] == 1 + + +def test_detect_browser_leftover(tmp_path): + _write(tmp_path, "R/script.R", "browser()\n") + entries, _ = detect_smells(tmp_path) + smell = _entry(entries, "browser_leftover") + assert smell["count"] == 1 + + +def test_detect_debug_leftover(tmp_path): + _write(tmp_path, "R/script.R", "debug(my_func)\n") + entries, _ = detect_smells(tmp_path) + smell = _entry(entries, "debug_leftover") + assert smell["count"] == 1 + + +def test_detect_t_f_ambiguous(tmp_path): + _write(tmp_path, "R/script.R", "x <- T\ny <- F\n") + entries, _ = detect_smells(tmp_path) + smell = _entry(entries, "t_f_ambiguous") + assert smell["count"] == 2 + + +def test_t_f_ignores_true_false(tmp_path): + _write(tmp_path, "R/script.R", "x <- TRUE\ny <- FALSE\n") + entries, _ = detect_smells(tmp_path) + ids = {e["id"] for e in entries} + assert "t_f_ambiguous" not in ids + + +def test_t_f_ignores_identifiers_containing_t_or_f(tmp_path): + _write(tmp_path, "R/script.R", "data <- transform(df)\n") + entries, _ = detect_smells(tmp_path) + ids = {e["id"] for e in entries} + assert "t_f_ambiguous" not in ids + + +def test_detect_one_to_n(tmp_path): + _write(tmp_path, "R/script.R", "for (i in 1:n()) {\n}\n") + entries, _ = detect_smells(tmp_path) + smell = _entry(entries, "one_to_n") + assert smell["count"] == 1 + + +def test_detect_strings_as_factors(tmp_path): + _write(tmp_path, "R/script.R", 'options(stringsAsFactors = FALSE)\n') + entries, _ = detect_smells(tmp_path) + smell = _entry(entries, "strings_as_factors") + assert smell["count"] == 1 + + +def test_detect_library_in_function(tmp_path): + _write( + tmp_path, + "R/script.R", + "my_func <- function(x) {\n library(dplyr)\n x %>% mutate(y = 1)\n}\n", + ) + entries, _ = detect_smells(tmp_path) + smell = _entry(entries, "library_in_function") + assert smell["count"] == 1 + assert smell["matches"][0]["line"] == 2 + + +def test_library_at_top_level_is_not_flagged(tmp_path): + _write(tmp_path, "R/script.R", "library(dplyr)\n\nx <- 1\n") + entries, _ = detect_smells(tmp_path) + ids = {e["id"] for e in entries} + assert "library_in_function" not in ids + + +def test_no_smells_in_clean_code(tmp_path): + _write( + tmp_path, + "R/script.R", + 'library(dplyr)\n\nadd <- function(a, b) {\n a + b\n}\n', + ) + entries, total_files = detect_smells(tmp_path) + assert total_files == 1 + assert entries == [] + + +def test_skips_excluded_dirs(tmp_path): + """Test that exclusions work via framework's discovery system.""" + _write(tmp_path, "renv/library/script.R", "setwd('.')\n") + _write(tmp_path, "R/clean.R", "x <- 1\n") + + # With framework discovery, we need to set exclusions via runtime context + from desloppify.base.discovery.source import set_exclusions + set_exclusions(["renv/**"]) + + try: + entries, total_files = detect_smells(tmp_path) + assert total_files == 1 + finally: + # Reset exclusions + set_exclusions([]) + + +def test_strips_comments_before_matching(tmp_path): + _write(tmp_path, "R/script.R", "# setwd is bad\nx <- 1\n") + entries, _ = detect_smells(tmp_path) + ids = {e["id"] for e in entries} + assert "setwd" not in ids + + +def test_multiple_smells_in_one_file(tmp_path): + _write( + tmp_path, + "R/script.R", + 'setwd("/tmp")\ncounter <<- 0\nbrowser()\nT\n', + ) + entries, total_files = detect_smells(tmp_path) + ids = {e["id"] for e in entries} + assert total_files == 1 + assert "setwd" in ids + assert "global_assign" in ids + assert "browser_leftover" in ids + assert "t_f_ambiguous" in ids + + +def test_handles_nonexistent_files_gracefully(tmp_path): + entries, total_files = detect_smells(tmp_path) + assert total_files == 0 + assert entries == [] + + +def test_hash_in_string_not_stripped(tmp_path): + """Ensure # inside string literals is not treated as comment.""" + _write(tmp_path, "R/script.R", 'x <- "value #1"\n') + entries, _ = detect_smells(tmp_path) + ids = {e["id"] for e in entries} + assert "setwd" not in ids + + +def test_library_in_if_block_at_top_level_not_flagged(tmp_path): + """Ensure library() in non-function braces at top level is not flagged.""" + _write( + tmp_path, + "R/script.R", + "if (TRUE) {\n library(dplyr)\n}\n", + ) + entries, _ = detect_smells(tmp_path) + ids = {e["id"] for e in entries} + assert "library_in_function" not in ids + + +def test_library_in_for_loop_at_top_level_not_flagged(tmp_path): + """Ensure library() in for/while at top level is not flagged.""" + _write( + tmp_path, + "R/script.R", + "for (i in 1:10) {\n library(dplyr)\n}\n", + ) + entries, _ = detect_smells(tmp_path) + ids = {e["id"] for e in entries} + assert "library_in_function" not in ids + + +def test_nested_function_with_library(tmp_path): + """Ensure library() in nested function is detected.""" + _write( + tmp_path, + "R/script.R", + "outer <- function() {\n inner <- function() {\n library(dplyr)\n }\n}\n", + ) + entries, _ = detect_smells(tmp_path) + smell = _entry(entries, "library_in_function") + assert smell["count"] == 1