From 7473d88b4c6ebf581cef1984826601ea82fd722d Mon Sep 17 00:00:00 2001 From: Pablo Galindo Date: Tue, 15 Nov 2022 17:19:54 +0000 Subject: [PATCH] Add a new marker to check for memory leaks Users have indicated that it will be very useful if the plugin exposes a way to detect memory leaks in tests. This is possible, but is a bit tricky as the interpreter can allocate memory for internal caches, as well as user functions. To make this more reliable, the new marker will take two parameters: * The watermark of memory to ignore. If the memory leaked by the test is higher than this value, the test will fail and it will pass otherwise. * The number of warmup runs. This allows to run the test multiple times (assuming it passes) before actually checking for leaks. This allows to warmup user and interpreter caches. Signed-off-by: Pablo Galindo --- docs/usage.rst | 84 ++++++++++++ src/__init__.py | 0 src/pytest_memray/cpython_caches.py | 194 ++++++++++++++++++++++++++++ src/pytest_memray/marks.py | 131 ++++++++++++++++--- src/pytest_memray/plugin.py | 92 +++++++++++-- tests/test_pytest_memray.py | 112 ++++++++++++++++ 6 files changed, 586 insertions(+), 27 deletions(-) create mode 100644 src/__init__.py create mode 100644 src/pytest_memray/cpython_caches.py diff --git a/docs/usage.rst b/docs/usage.rst index a409335..b138eba 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -40,6 +40,10 @@ validations on tests when this plugin is enabled. ``limit_memory`` ---------------- +.. py:function:: limit_memory(memory_limit: str) + + Fail the execution of the test if the test allocates more memory than allowed + When this marker is applied to a test, it will cause the test to fail if the execution of the test allocates more memory than allowed. It takes a single argument with a string indicating the maximum memory that the test can allocate. @@ -61,3 +65,83 @@ Example of usage: @pytest.mark.limit_memory("24 MB") def test_foobar(): pass # do some stuff that allocates memory + +``limit_leaks`` +--------------- + +.. py:function:: limit_leaks(max_leaked_memory: str , *, warmups: int=0) + + Fail the execution of the test if the test leaks more memory than allowed. + +.. important:: + + To detect leaks, Memray needs to intercept calls to the Python allocators. + This is adds significant overhead, and will slow your test down. + Additionally, if the ``warmups`` argument is provided, the test will be run + multiple times before the final execution, which will also increase the + total time it takes to complete the test run. + +When this marker is applied to a test, it will cause the test to fail if the execution +of the test leaks more memory than allowed. It takes a single positional argument with a +string indicating the maximum memory that the test is allowed to leak. + +Leaks are defined as memory that is allocated **in the marked test** +that is not freed before leaving the test body. + +The format for the string is `` ([KMGTP]B|B)``. The marker will raise +``ValueError`` if the string format cannot be parsed correctly. + +The marker also takes an optional keyword-only argument ``warmups``. This argument +indicates the number of times **the test body (and not any of the fixtures)** +will be executed before the memory leak check is performed. This is useful to +avoid false positives where memory allocated below the test is cached and +survives the test run, as well as to account for memory allocated (and not +released) by the interpreter under normal execution. + +.. tip:: + + You can pass the ``--memray-bin-path`` argument to ``pytest`` to specify + a directory where Memray will store the binary files with the results. You + can then use the ``memray`` CLI to further investigate the allocations and the + leaks using any Memray reporters you'd like. Check `the memray docs + `_ for more + information. + +Example of usage: + +.. code-block:: python + + @pytest.mark.limit_leaks("1 MB", warmups=3) + def test_foobar(): + pass # do some stuff that cannot leak memory + +.. warning:: + + Is **very** challenging to write tests that do not "leak" memory in some way. + This marker is intended to be used to detect leaks in very simple unit tests + and can be challenging to use reliably when applied to integration tests or + more complex scenarios. Also, ``pytest`` itself does things that will be interpreted + as leaking memory, for example: + + * ``pytest`` captures stdout/stderr by default and will not release the + memory associated to these strings until the test ends. This will be + interpreted as a leak. + * ``pytest`` captures logging records by default and will not release the + memory associated to these strings until the test ends. This will be + interpreted as a leak. + * Memory allocated **in the test body** from fixtures that is only + released at fixture teardown time will be interpreted as a leak + because the plugin cannot see the fixtures being deallocated. + * The Python interpreter has some internal caches that will not be cleared + when the test ends. The plugin does is best to warmup all available + interpreter caches but there are some that cannot be correctly detected so + you may need to allow some small amount of leaked memory or use the + ``warmups`` argument to avoid false positive leak reports caused by + objects that the interpreter plans to reuse later. These caches are + implementation details of the interpreter, so the amount of memory + allocated, the location of the allocation, and the allocator that was used + can all change from one Python version to another. + + For this reason, using this marker with "0B" as the maximum allowed leaked memory + in tests that are not very simple unit tests will almost always require a + nonzero value for the ``warmups`` argument and some times that will not be enough. diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pytest_memray/cpython_caches.py b/src/pytest_memray/cpython_caches.py new file mode 100644 index 0000000..42caea6 --- /dev/null +++ b/src/pytest_memray/cpython_caches.py @@ -0,0 +1,194 @@ +from __future__ import annotations + +import logging +import contextlib +import sys +import warnings +from inspect import isabstract +from typing import Any +from typing import Generator +from typing import Set +from typing import Tuple + +try: + from _abc import _get_dump +except ImportError: + import weakref + + def _get_dump(cls: Any) -> Tuple[Set[Any], Any, Any, Any]: + # Reimplement _get_dump() for pure-Python implementation of + # the abc module (Lib/_py_abc.py) + registry_weakrefs = set(weakref.ref(obj) for obj in cls._abc_registry) + return ( + registry_weakrefs, + cls._abc_cache, + cls._abc_negative_cache, + cls._abc_negative_cache_version, + ) + + +@contextlib.contextmanager +def warmup_bytecode_cache() -> Generator[None, None, None]: + prev_prov = sys.getprofile() + sys.setprofile(lambda *args: args) + try: + yield + finally: + sys.setprofile(prev_prov) + + +def clean_interpreter_caches() -> None: + # Clear the warnings registry, so they can be displayed again + for mod in sys.modules.values(): + if hasattr(mod, "__warningregistry__"): + del mod.__warningregistry__ + + # Flush standard output, so that buffered data is sent to the OS and + # associated Python objects are reclaimed. + for stream in (sys.stdout, sys.stderr, sys.__stdout__, sys.__stderr__): + if stream is not None: + stream.flush() + + with contextlib.suppress(KeyError): + re = sys.modules["re"] + re.purge() + + with contextlib.suppress(KeyError): + _strptime = sys.modules["_strptime"] + _strptime._regex_cache.clear() + + with contextlib.suppress(KeyError): + urllib_parse = sys.modules["urllib.parse"] + urllib_parse.clear_cache() + + with contextlib.suppress(KeyError): + urllib_request = sys.modules["urllib.request"] + urllib_request.urlcleanup() + + with contextlib.suppress(KeyError): + linecache = sys.modules["linecache"] + linecache.clearcache() + + with contextlib.suppress(KeyError): + mimetypes = sys.modules["mimetypes"] + mimetypes._default_mime_types() + + with contextlib.suppress(KeyError): + filecmp = sys.modules["filecmp"] + filecmp._cache.clear() + + with contextlib.suppress(KeyError): + struct = sys.modules["struct"] + struct._clearcache() + + with contextlib.suppress(KeyError): + doctest = sys.modules["doctest"] + doctest.master = None # type: ignore + + with contextlib.suppress(KeyError): + ctypes = sys.modules["ctypes"] + ctypes._reset_cache() + + with contextlib.suppress(KeyError): + typing = sys.modules["typing"] + for cleanup_fn in typing._cleanups: + cleanup_fn() + + with contextlib.suppress(KeyError, AttributeError): + fractions = sys.modules["fractions"] + fractions._hash_algorithm.cache_clear() + + sys._clear_type_cache() + + +def warm_caches() -> None: + + # Run the clean_interpreter_caches() function under + # a dummy tracer to warmup the bytecode cache so it + # doesn't interfere when called under tracing. + with warmup_bytecode_cache(): + clean_interpreter_caches() + + # char cache + s = bytes(range(256)) + for i in range(256): + s[i : i + 1] # noqa + # unicode cache + [chr(i) for i in range(256)] + # int cache + list(range(-5, 257)) + + +@contextlib.contextmanager +def interpreter_cache_context() -> Generator[None, None, None]: + import collections.abc + import copyreg + + fs = warnings.filters[:] + ps = copyreg.dispatch_table.copy() + pic = sys.path_importer_cache.copy() + try: + import zipimport + except ImportError: + zdc = None # Run unmodified on platforms without zipimport support + else: + zdc = zipimport._zip_directory_cache.copy() # type: ignore + abcs = {} + for abc in [getattr(collections.abc, a) for a in collections.abc.__all__]: + if not isabstract(abc): + continue + for obj in abc.__subclasses__() + [abc]: + abcs[obj] = _get_dump(obj)[0] + + warm_caches() + + yield + + # Restore some original values. + warnings.filters[:] = fs # type: ignore + copyreg.dispatch_table.clear() + copyreg.dispatch_table.update(ps) + sys.path_importer_cache.clear() + sys.path_importer_cache.update(pic) + try: + import zipimport + except ImportError: + pass # Run unmodified on platforms without zipimport support + else: + zipimport._zip_directory_cache.clear() # type: ignore + zipimport._zip_directory_cache.update(zdc) # type: ignore + + # Clear ABC registries, restoring previously saved ABC registries. + abs_classes = [getattr(collections.abc, a) for a in collections.abc.__all__] + for abc in filter(isabstract, abs_classes): + for obj in abc.__subclasses__() + [abc]: + for ref in abcs.get(obj, set()): + if ref() is not None: + obj.register(ref()) + obj._abc_caches_clear() + + +@contextlib.contextmanager +def all_logging_disabled( + highest_level: int = logging.CRITICAL, +) -> Generator[None, None, None]: + """ + A context manager that will prevent any logging messages + triggered during the body from being processed. + :param highest_level: the maximum logging level in use. + This would only need to be changed if a custom level greater than CRITICAL + is defined. + """ + # two kind-of hacks here: + # * can't get the highest logging level in effect => delegate to the user + # * can't get the current module-level override => use an undocumented + # (but non-private!) interface + + previous_level = logging.root.manager.disable + + logging.disable(highest_level) + + try: + yield + finally: + logging.disable(previous_level) diff --git a/src/pytest_memray/marks.py b/src/pytest_memray/marks.py index 2faa30d..99cd3ed 100644 --- a/src/pytest_memray/marks.py +++ b/src/pytest_memray/marks.py @@ -1,12 +1,22 @@ from __future__ import annotations from dataclasses import dataclass +from gc import collect +from pathlib import Path +from typing import Any +from typing import Callable from typing import Tuple from typing import cast from memray import AllocationRecord from pytest import Config +from memray import FileReader +from memray import Tracker +from .cpython_caches import clean_interpreter_caches +from .cpython_caches import interpreter_cache_context +from .cpython_caches import warmup_bytecode_cache +from .cpython_caches import all_logging_disabled from .utils import parse_memory_string from .utils import sizeof_fmt from .utils import value_or_ini @@ -16,7 +26,7 @@ @dataclass class _MemoryInfo: - """Type that holds all memray-related info for a failed test.""" + """Type that holds memory-related info for a failed test.""" max_memory: float total_allocated_memory: int @@ -24,14 +34,9 @@ class _MemoryInfo: num_stacks: int native_stacks: bool - @property - def section(self) -> PytestSection: - """Return a tuple in the format expected by section reporters.""" - total_memory_str = sizeof_fmt(self.total_allocated_memory) - max_memory_str = sizeof_fmt(self.max_memory) + def _generate_section_text(self, limit_text: str, header_text: str) -> str: text_lines = [ - f"Test is using {total_memory_str} out of limit of {max_memory_str}", - "List of allocations: ", + f"{header_text} {sizeof_fmt(self.total_allocated_memory)} out of limit of {sizeof_fmt(self.max_memory)}" ] for record in self.allocations: size = record.size @@ -50,32 +55,128 @@ def section(self) -> PytestSection: break text_lines.append(f"{padding*2}{function}:{file}:{line}") stacks_left -= 1 + return "\n".join(text_lines) + + @property + def section(self) -> PytestSection: + """Return a tuple in the format expected by section reporters.""" + return ( + "memray-max-memory", + self._generate_section_text("Test is using", "List of allocations:"), + ) + + @property + def long_repr(self) -> str: + """Generate a longrepr user-facing error message.""" + return f"Test was limited to {sizeof_fmt(self.max_memory)} but allocated {sizeof_fmt(self.total_allocated_memory)}" - return "memray-max-memory", "\n".join(text_lines) + +@dataclass +class _LeakedInfo(_MemoryInfo): + """Type that holds leaked memory-related info for a failed test.""" + + @property + def section(self) -> PytestSection: + """Return a tuple in the format expected by section reporters.""" + return ( + "memray-leaked-memory", + self._generate_section_text("Test leaked", "List of leaked allocations:"), + ) @property def long_repr(self) -> str: """Generate a longrepr user-facing error message.""" - total_memory_str = sizeof_fmt(self.total_allocated_memory) - max_memory_str = sizeof_fmt(self.max_memory) - return f"Test was limited to {max_memory_str} but allocated {total_memory_str}" + return f"Test was allowed to leak {sizeof_fmt(self.max_memory)} but leaked {sizeof_fmt(self.total_allocated_memory)}" def limit_memory( - limit: str, *, _allocations: list[AllocationRecord], _config: Config + limit: str, + *, + _results_file: Path, + _allocations: list[AllocationRecord], + _config: Config, ) -> _MemoryInfo | None: """Limit memory used by the test.""" + reader = FileReader(_results_file) + allocations = list( + (reader.get_high_watermark_allocation_records(merge_threads=True)) + ) max_memory = parse_memory_string(limit) - total_allocated_memory = sum(record.size for record in _allocations) + total_allocated_memory = sum(record.size for record in allocations) if total_allocated_memory < max_memory: return None num_stacks: int = cast(int, value_or_ini(_config, "stacks")) native_stacks: bool = cast(bool, value_or_ini(_config, "native")) return _MemoryInfo( - max_memory, total_allocated_memory, _allocations, num_stacks, native_stacks + max_memory, total_allocated_memory, allocations, num_stacks, native_stacks + ) + + +def limit_leaks_runner( + func: Callable[[], Any], + results_file: Path, + *marker_args: Any, + warmups: int = 1, + **marker_kwargs: Any, +) -> Any: + result = None + + def _memray_call_function() -> Any: + return func() + + with interpreter_cache_context(): + with warmup_bytecode_cache(): + for _ in range(warmups): + func() + + with all_logging_disabled(): + with Tracker( + results_file, trace_python_allocators=True, native_traces=True + ): + result = _memray_call_function() + collect() # GC any cycles created by the test. + clean_interpreter_caches() + collect() # GC any remaining cycles created by clean_interpreter_caches. + + return result + + +def limit_leaks( + limit: str, *, _results_file: Path, _config: Config, **kwargs: Any +) -> _LeakedInfo | None: + reader = FileReader(_results_file) + main_thread = reader.metadata.pid + + def _is_good_allocation(allocation: AllocationRecord) -> bool: + (func, _, _), *_ = allocation.hybrid_stack_trace(max_stacks=1) + if "_PyCode_CreateLineArray" in func: + return False + if allocation.tid in {1, main_thread}: + return any( + "_memray_call_function" == pyfunc + for (pyfunc, _, _) in allocation.stack_trace() + ) + return True + + allocations = list( + filter( + _is_good_allocation, + reader.get_leaked_allocation_records(merge_threads=False), + ) + ) + + max_memory = parse_memory_string(limit) + total_leaked_memory = sum(record.size for record in allocations) + if total_leaked_memory <= max_memory: + return None + num_stacks: int = cast(int, value_or_ini(_config, "stacks")) + native_stacks: bool = cast(bool, value_or_ini(_config, "native")) + return _LeakedInfo( + max_memory, total_leaked_memory, allocations, num_stacks, native_stacks ) __all__ = [ "limit_memory", + "limit_leaks", ] diff --git a/src/pytest_memray/plugin.py b/src/pytest_memray/plugin.py index 1933388..afcc62f 100644 --- a/src/pytest_memray/plugin.py +++ b/src/pytest_memray/plugin.py @@ -12,11 +12,16 @@ from pathlib import Path from tempfile import TemporaryDirectory from typing import Any +from typing import Callable +from typing import Dict from typing import Generator from typing import Iterable from typing import List +from typing import Mapping +from typing import Protocol from typing import Tuple from typing import cast +from typing import Iterator from _pytest.terminal import TerminalReporter from memray import AllocationRecord @@ -33,13 +38,36 @@ from pytest import TestReport from pytest import hookimpl +from .marks import limit_leaks +from .marks import limit_leaks_runner from .marks import limit_memory from .utils import WriteEnabledDirectoryAction from .utils import positive_int from .utils import sizeof_fmt from .utils import value_or_ini -MARKERS = {"limit_memory": limit_memory} +TestFunction = Callable[[], Any] + + +class Runner(Protocol): + def __call__( + self, + func: TestFunction, + results_file: Path, + native: bool, + trace_python_allocators: bool, + *args: Any, + **kwargs: Any, + ) -> Any: + ... + + +MARKERS: Dict[str, Callable[..., Any]] = { + "limit_memory": limit_memory, + "limit_leaks": limit_leaks, +} +MARKERS_RUNNERS: Dict[str, Runner] = {"limit_leaks": limit_leaks_runner} +AUTOMATIC_MARKERS = {"limit_memory"} N_TOP_ALLOCS = 5 N_HISTOGRAM_BINS = 5 @@ -88,7 +116,9 @@ class Manager: def __init__(self, config: Config) -> None: self.results: dict[str, Result] = {} self.config = config - path: Path | None = config.getvalue("memray_bin_path") + the_path = config.getvalue("memray_bin_path") + assert the_path is None or isinstance(the_path, Path) + path: Path | None = the_path self._tmp_dir: None | TemporaryDirectory[str] = None if path is None: # Check the MEMRAY_RESULT_PATH environment variable. If this @@ -120,8 +150,24 @@ def pytest_unconfigure(self, config: Config) -> Generator[None, None, None]: if os.environ.get("MEMRAY_RESULT_PATH"): del os.environ["MEMRAY_RESULT_PATH"] + def _default_runner( + self, + func: Callable[[], Any], + results_file: Path, + native: bool, + trace_python_allocators: bool, + *args: Any, + **kwargs: Any, + ) -> Any: + with Tracker( + results_file, + native_traces=native, + trace_python_allocators=trace_python_allocators, + ): + return func() + @hookimpl(hookwrapper=True) # type: ignore[misc] # Untyped decorator - def pytest_pyfunc_call(self, pyfuncitem: Function) -> object | None: + def pytest_pyfunc_call(self, pyfuncitem: Function) -> Iterator[None]: func = pyfuncitem.obj markers = { @@ -130,10 +176,28 @@ def pytest_pyfunc_call(self, pyfuncitem: Function) -> object | None: if marker.name in MARKERS } - if not markers and not value_or_ini(self.config, "memray"): + if not ( + any(marker in markers for marker in AUTOMATIC_MARKERS) + or value_or_ini(self.config, "memray") + ): yield return + if len(markers) > 1: + raise ValueError( + "Only one memray marker can be applied at the same time to the same test" + ) + + runner: Runner = self._default_runner + marker_args: Tuple[Any, ...] = tuple() + marker_kwargs: Mapping[str, Any] = dict() + for marker in pyfuncitem.iter_markers(): + if marker.name in MARKERS_RUNNERS: + runner = MARKERS_RUNNERS[marker.name] + marker_args = marker.args + marker_kwargs = marker.kwargs + break + def _build_bin_path() -> Path: if self._tmp_dir is None and not os.getenv("MEMRAY_RESULT_PATH"): of_id = pyfuncitem.nodeid.replace("::", "-") @@ -153,15 +217,17 @@ def _build_bin_path() -> Path: @functools.wraps(func) def wrapper(*args: Any, **kwargs: Any) -> object | None: - test_result: object | Any = None try: result_file = _build_bin_path() - with Tracker( + runner_function = functools.partial(func, *args, **kwargs) + result: object = runner( + runner_function, result_file, - native_traces=native, - trace_python_allocators=trace_python_allocators, - ): - test_result = func(*args, **kwargs) + native, + trace_python_allocators, + *marker_args, + **marker_kwargs, + ) try: metadata = FileReader(result_file).metadata except OSError: @@ -180,7 +246,7 @@ def wrapper(*args: Any, **kwargs: Any) -> object | None: # hook again with whatever is here, which will cause the wrapper # to be wrapped again. pyfuncitem.obj = func - return test_result + return None pyfuncitem.obj = wrapper yield @@ -204,13 +270,15 @@ def pytest_runtest_makereport( result = self.results.get(item.nodeid) if not result: continue + reader = FileReader(result.result_file) func = reader.get_high_watermark_allocation_records - allocations = list((func(merge_threads=True))) + allocations = list(func(merge_threads=True)) res = marker_fn( *marker.args, **marker.kwargs, _allocations=allocations, + _results_file=result.result_file, _config=self.config, ) if res: diff --git a/tests/test_pytest_memray.py b/tests/test_pytest_memray.py index dd19afb..10d3775 100644 --- a/tests/test_pytest_memray.py +++ b/tests/test_pytest_memray.py @@ -594,3 +594,115 @@ def test_memory_alloc_fails(): ) result = pytester.runpytest("-Werror", "--memray") assert result.ret == ExitCode.OK + + +@pytest.mark.parametrize( + "size, outcome", + [ + (1024 * 1, ExitCode.OK), + (1024 * 1 + 1, ExitCode.TESTS_FAILED), + (1024 * 2, ExitCode.TESTS_FAILED), + (1024 * 1 - 1, ExitCode.OK), + (1024 * 0.5, ExitCode.OK), + ], +) +def test_leak_marker(pytester: Pytester, size: int, outcome: ExitCode) -> None: + pytester.makepyfile( + f""" + import pytest + from memray._test import MemoryAllocator + allocator = MemoryAllocator() + + @pytest.mark.limit_leaks("1KB", warmups=1) + def test_memory_alloc_fails(): + allocator.valloc({size}) + # No free call here + """ + ) + + result = pytester.runpytest("--memray") + + assert result.ret == outcome + + +def test_leak_marker_warmups(pytester: Pytester) -> None: + pytester.makepyfile( + """ + import pytest + import sys + n = 0 + + @pytest.mark.limit_leaks("1MB", warmups=10) + def test_the_thing(): + global n + n += 1 + print(f"Called {n} times!", file=sys.stderr) + """ + ) + + with patch("pytest_memray.marks.Tracker") as mock: + result = pytester.runpytest("--memray", "-s") + + assert result.ret == ExitCode.OK + mock.assert_called_once() + output = result.stderr.str() + assert "Called 11 times!" in output + + +def test_leak_marker_does_not_work_if_memray_inactive(pytester: Pytester) -> None: + pytester.makepyfile( + """ + import pytest + from memray._test import MemoryAllocator + allocator = MemoryAllocator() + + @pytest.mark.limit_leaks("0B", warmups=1) + def test_memory_alloc_fails(): + allocator.valloc(512) + # No free call here + """ + ) + + result = pytester.runpytest("") + + assert result.ret == ExitCode.OK + + +def test_multiple_markers_are_not_supported(pytester: Pytester) -> None: + pytester.makepyfile( + """ + import pytest + + @pytest.mark.limit_leaks("0MB") + @pytest.mark.limit_memory("0MB") + def test_bar(): + pass + """ + ) + + result = pytester.runpytest("--memray") + assert result.ret == ExitCode.TESTS_FAILED + + output = result.stdout.str() + assert "Only one memray marker can be applied" in output + + +def test_multiple_markers_are_not_supported_with_global_marker( + pytester: Pytester, +) -> None: + pytester.makepyfile( + """ + import pytest + pytestmark = pytest.mark.limit_memory("1 MB") + + @pytest.mark.limit_leaks("0MB") + def test_bar(): + pass + """ + ) + + result = pytester.runpytest("--memray") + assert result.ret == ExitCode.TESTS_FAILED + + output = result.stdout.str() + assert "Only one memray marker can be applied" in output