diff --git a/.idea/dictionaries/pavel.xml b/.idea/dictionaries/pavel.xml index 9d05c57..42b0157 100644 --- a/.idea/dictionaries/pavel.xml +++ b/.idea/dictionaries/pavel.xml @@ -230,8 +230,9 @@ weakref wireshark woah + wrkspc xbee zubax - \ No newline at end of file + diff --git a/appveyor.yml b/appveyor.yml index 9460544..d265d63 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -65,11 +65,11 @@ for: - python --version test_script: # GNU/Linux test. - - sh: nox --non-interactive --error-on-missing-interpreters --session test lint --python $PYTHON + - sh: nox --non-interactive --error-on-missing-interpreters --session test test_eol pristine lint --python $PYTHON # skip macOS docs build - sh: 'if [ "$APPVEYOR_BUILD_WORKER_IMAGE" != "macos" ]; then nox --non-interactive --session docs; fi' # MS Windows test. - - cmd: nox --forcecolor --non-interactive --error-on-missing-interpreters --session test lint + - cmd: nox --forcecolor --non-interactive --error-on-missing-interpreters --session test test_eol pristine lint # Shared test for all platforms. - git clone https://github.com/OpenCyphal/public_regulated_data_types .dsdl-test - python -c "import pydsdl; pydsdl.read_namespace('.dsdl-test/uavcan', [])" diff --git a/noxfile.py b/noxfile.py index 80a4504..04ab3d5 100644 --- a/noxfile.py +++ b/noxfile.py @@ -6,10 +6,11 @@ import os import shutil from pathlib import Path +from functools import partial import nox -PYTHONS = ["3.6", "3.7", "3.8", "3.9", "3.10", "3.11"] +PYTHONS = ["3.8", "3.9", "3.10", "3.11"] """The newest supported Python shall be listed LAST.""" nox.options.error_on_external_run = True @@ -48,9 +49,9 @@ def test(session): session.log("Using the newest supported Python: %s", is_latest_python(session)) session.install("-e", ".") session.install( - "pytest ~= 7.0", - "pytest-randomly ~= 3.10", - "coverage ~= 6.2", + "pytest ~= 7.3", + "pytest-randomly ~= 3.12", + "coverage ~= 7.2", ) session.run("coverage", "run", "-m", "pytest") session.run("coverage", "report", "--fail-under=95") @@ -60,12 +61,32 @@ def test(session): session.log(f"OPEN IN WEB BROWSER: file://{report_file}") +@nox.session(python=["3.6", "3.7"]) +def test_eol(session): + """This is a minimal test session for those old Pythons that have EOLed.""" + session.install("-e", ".") + session.install("pytest") + session.run("pytest") + + +@nox.session(python=PYTHONS) +def pristine(session): + """ + Install the library into a pristine environment and ensure that it is importable. + This is needed to catch errors caused by accidental reliance on test dependencies in the main codebase. + """ + exe = partial(session.run, "python", "-c", silent=True) + session.cd(session.create_tmp()) # Change the directory to reveal spurious dependencies from the project root. + session.install(f"{ROOT_DIR}") # Testing bare installation first. + exe("import pydsdl") + + @nox.session(python=PYTHONS, reuse_venv=True) def lint(session): session.log("Using the newest supported Python: %s", is_latest_python(session)) session.install( - "mypy == 0.942", - "pylint == 2.13.*", + "mypy ~= 1.2.0", + "pylint ~= 2.17.2", ) session.run( "mypy", @@ -84,7 +105,7 @@ def lint(session): }, ) if is_latest_python(session): - session.install("black == 22.*") + session.install("black ~= 23.3") session.run("black", "--check", ".") diff --git a/pydsdl/__init__.py b/pydsdl/__init__.py index 11ca69a..c0422a7 100644 --- a/pydsdl/__init__.py +++ b/pydsdl/__init__.py @@ -4,10 +4,10 @@ # pylint: disable=wrong-import-position -import os as _os import sys as _sys +from pathlib import Path as _Path -__version__ = "1.18.0" +__version__ = "1.19.0" __version_info__ = tuple(map(int, __version__.split(".")[:3])) __license__ = "MIT" __author__ = "OpenCyphal" @@ -22,7 +22,7 @@ # to import stuff dynamically after the initialization is finished (e.g., function-local imports won't be # able to reach the third-party stuff), but we don't care. _original_sys_path = _sys.path -_sys.path = [_os.path.join(_os.path.dirname(__file__), "third_party")] + _sys.path +_sys.path = [str(_Path(__file__).parent / "third_party")] + _sys.path # Never import anything that is not available here - API stability guarantees are only provided for the exposed items. from ._namespace import read_namespace as read_namespace diff --git a/pydsdl/_bit_length_set/_symbolic.py b/pydsdl/_bit_length_set/_symbolic.py index 07c250d..2dfaec2 100644 --- a/pydsdl/_bit_length_set/_symbolic.py +++ b/pydsdl/_bit_length_set/_symbolic.py @@ -132,7 +132,7 @@ def modulo(self, divisor: int) -> typing.Set[int]: mods = [ch.modulo(divisor) for ch in self._children] prod = itertools.product(*mods) sums = set(map(sum, prod)) - return {typing.cast(int, x) % divisor for x in sums} + return {x % divisor for x in sums} @property def min(self) -> int: diff --git a/pydsdl/_dsdl_definition.py b/pydsdl/_dsdl_definition.py index bda5eb4..a8114da 100644 --- a/pydsdl/_dsdl_definition.py +++ b/pydsdl/_dsdl_definition.py @@ -2,7 +2,6 @@ # This software is distributed under the terms of the MIT License. # Author: Pavel Kirienko -import os import time from typing import Iterable, Callable, Optional, List import logging @@ -41,21 +40,14 @@ def __init__(self, file_path: Path, root_namespace_path: Path): self._text = str(f.read()) # Checking the sanity of the root directory path - can't contain separators - if CompositeType.NAME_COMPONENT_SEPARATOR in os.path.split(self._root_namespace_path)[-1]: + if CompositeType.NAME_COMPONENT_SEPARATOR in self._root_namespace_path.name: raise FileNameFormatError("Invalid namespace name", path=self._root_namespace_path) # Determining the relative path within the root namespace directory - relative_path = str( - os.path.join( - os.path.split(self._root_namespace_path)[-1], - self._file_path.relative_to(self._root_namespace_path), - ) - ) - - relative_directory, basename = [str(x) for x in os.path.split(relative_path)] # type: str, str + relative_path = self._root_namespace_path.name / self._file_path.relative_to(self._root_namespace_path) # Parsing the basename, e.g., 434.GetTransportStatistics.0.1.dsdl - basename_components = basename.split(".")[:-1] + basename_components = relative_path.name.split(".")[:-1] str_fixed_port_id: Optional[str] = None if len(basename_components) == 4: str_fixed_port_id, short_name, str_major_version, str_minor_version = basename_components @@ -86,11 +78,10 @@ def __init__(self, file_path: Path, root_namespace_path: Path): raise FileNameFormatError("Could not parse the version numbers", path=self._file_path) from None # Finally, constructing the name - namespace_components = list(relative_directory.strip(os.sep).split(os.sep)) + namespace_components = list(relative_path.parent.parts) for nc in namespace_components: if CompositeType.NAME_COMPONENT_SEPARATOR in nc: raise FileNameFormatError(f"Invalid name for namespace component: {nc!r}", path=self._file_path) - self._name: str = CompositeType.NAME_COMPONENT_SEPARATOR.join(namespace_components + [str(short_name)]) self._cached_type: Optional[CompositeType] = None diff --git a/pydsdl/_namespace.py b/pydsdl/_namespace.py index f34225b..600408a 100644 --- a/pydsdl/_namespace.py +++ b/pydsdl/_namespace.py @@ -4,10 +4,8 @@ # pylint: disable=logging-not-lazy -import os from typing import Iterable, Callable, DefaultDict, List, Optional, Union, Set, Dict import logging -import fnmatch import collections from pathlib import Path from . import _serializable @@ -200,10 +198,8 @@ def read_namespace( return types -_DSDL_FILE_GLOBS = [ - "*.dsdl", # https://forum.opencyphal.org/t/uavcan-file-extension/438 - "*.uavcan", # Legacy name, not for new projects. -] +DSDL_FILE_GLOB = "*.dsdl" +DSDL_FILE_GLOB_LEGACY = "*.uavcan" _LOG_LIST_ITEM_PREFIX = " " * 4 _logger = logging.getLogger(__name__) @@ -259,18 +255,12 @@ def _ensure_no_name_collisions( for lu in lookup_definitions: lu_full_namespace_period = lu.full_namespace.lower() + "." lu_full_name_period = lu.full_name.lower() + "." - """ - This is to allow the following messages to coexist happily: - - zubax/noncolliding/iceberg/Ice.0.1.dsdl - zubax/noncolliding/Iceb.0.1.dsdl - - The following is still not allowed: - - zubax/colliding/iceberg/Ice.0.1.dsdl - zubax/colliding/Iceberg.0.1.dsdl - - """ + # This is to allow the following messages to coexist happily: + # zubax/non_colliding/iceberg/Ice.0.1.dsdl + # zubax/non_colliding/IceB.0.1.dsdl + # The following is still not allowed: + # zubax/colliding/iceberg/Ice.0.1.dsdl + # zubax/colliding/Iceberg.0.1.dsdl if tg.full_name != lu.full_name and tg.full_name.lower() == lu.full_name.lower(): raise DataTypeNameCollisionError( "Full name of this definition differs from %s only by letter case, " @@ -395,9 +385,6 @@ def _ensure_no_common_usage_errors( "dsdl", ] - def base(s: Path) -> str: - return str(os.path.basename(os.path.normpath(s))) - def is_valid_name(s: str) -> bool: try: _serializable.check_name(s) @@ -406,62 +393,65 @@ def is_valid_name(s: str) -> bool: else: return True - all_paths = set([root_namespace_directory] + list(lookup_directories)) + # resolve() will also normalize the case in case-insensitive filesystems. + all_paths = {root_namespace_directory.resolve()} | {x.resolve() for x in lookup_directories} for p in all_paths: - p = Path(os.path.normcase(p.resolve())) try: - candidates = [x for x in os.listdir(p) if os.path.isdir(os.path.join(p, x)) and is_valid_name(str(x))] + candidates = [x for x in p.iterdir() if x.is_dir() and is_valid_name(x.name)] except OSError: # pragma: no cover candidates = [] - if candidates and base(p) in suspicious_base_names: + if candidates and p.name in suspicious_base_names: report = ( "Possibly incorrect usage detected: input path %s is likely incorrect because the last path component " "should be the root namespace name rather than its parent directory. You probably meant:\n%s" ) % ( p, - "\n".join(("- %s" % os.path.join(p, s)) for s in candidates), + "\n".join(("- %s" % (p / s)) for s in candidates), ) reporter(report) def _ensure_no_nested_root_namespaces(directories: Iterable[Path]) -> None: - dir_str = list(sorted([os.path.join(os.path.abspath(x), "") for x in set(directories)])) - for a in dir_str: - for b in dir_str: - if (a != b) and a.startswith(b): + dirs = {x.resolve() for x in directories} # normalize the case in case-insensitive filesystems + for a in dirs: + for b in dirs: + if a.samefile(b): + continue + try: + a.relative_to(b) + except ValueError: + pass + else: raise NestedRootNamespaceError( - "The following namespace is nested inside this one, which is not permitted: %s" % a, path=Path(b) + "The following namespace is nested inside this one, which is not permitted: %s" % a, path=b ) def _ensure_no_namespace_name_collisions(directories: Iterable[Path]) -> None: - directories = list(sorted([x.resolve() for x in set(directories)])) + directories = {x.resolve() for x in directories} # normalize the case in case-insensitive filesystems for a in directories: for b in directories: - if (a != b) and a.name.lower() == b.name.lower(): + if a.samefile(b): + continue + if a.name.lower() == b.name.lower(): _logger.info("Collision: %r [%r] == %r [%r]", a, a.name, b, b.name) raise RootNamespaceNameCollisionError("The name of this namespace conflicts with %s" % b, path=a) -def _construct_dsdl_definitions_from_namespace( - root_namespace_path: Path, -) -> List[_dsdl_definition.DSDLDefinition]: +def _construct_dsdl_definitions_from_namespace(root_namespace_path: Path) -> List[_dsdl_definition.DSDLDefinition]: """ Accepts a directory path, returns a sorted list of abstract DSDL file representations. Those can be read later. The definitions are sorted by name lexicographically, then by major version (greatest version first), then by minor version (same ordering as the major version). """ - - def on_walk_error(os_ex: Exception) -> None: - raise os_ex # pragma: no cover - - walker = os.walk(root_namespace_path, onerror=on_walk_error, followlinks=True) - source_file_paths: Set[Path] = set() - for root, _dirnames, filenames in walker: - for glb in _DSDL_FILE_GLOBS: - for filename in fnmatch.filter(filenames, glb): - source_file_paths.add(Path(root, filename)) + for p in root_namespace_path.rglob(DSDL_FILE_GLOB): + source_file_paths.add(p) + for p in root_namespace_path.rglob(DSDL_FILE_GLOB_LEGACY): + source_file_paths.add(p) + _logger.warning( + "File uses deprecated extension %r, please rename to use %r: %s", DSDL_FILE_GLOB_LEGACY, DSDL_FILE_GLOB, p + ) output = [] # type: List[_dsdl_definition.DSDLDefinition] for fp in sorted(source_file_paths): @@ -476,166 +466,193 @@ def _unittest_dsdl_definition_constructor() -> None: import tempfile from ._dsdl_definition import FileNameFormatError - directory = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with - root_ns_dir = Path(directory.name, "foo").resolve() - (root_ns_dir / "nested").mkdir(parents=True) - - def touchy(relative_path: str) -> None: - p = os.path.join(root_ns_dir, relative_path.replace("/", os.path.sep)) - os.makedirs(os.path.dirname(p), exist_ok=True) - with open(p, "w") as f: - f.write("# TEST TEXT") - - def discard(relative_path: str) -> None: - os.unlink(os.path.join(root_ns_dir, relative_path)) - - touchy("123.Qwerty.123.234.dsdl") - touchy("nested/2.Asd.21.32.dsdl") - touchy("nested/Foo.32.43.dsdl") - - dsdl_defs = _construct_dsdl_definitions_from_namespace(root_ns_dir) - print(dsdl_defs) - lut = {x.full_name: x for x in dsdl_defs} # type: Dict[str, _dsdl_definition.DSDLDefinition] - assert len(lut) == 3 - - assert str(lut["foo.Qwerty"]) == repr(lut["foo.Qwerty"]) - assert ( - str(lut["foo.Qwerty"]) - == "DSDLDefinition(full_name='foo.Qwerty', version=Version(major=123, minor=234), fixed_port_id=123, " - "file_path=%s)" % lut["foo.Qwerty"].file_path - ) + with tempfile.TemporaryDirectory() as directory: + di = Path(directory).resolve() + root = di / "foo" + (root / "nested").mkdir(parents=True) + + (root / "123.Qwerty.123.234.dsdl").write_text("# TEST A") + (root / "nested/2.Asd.21.32.dsdl").write_text("# TEST B") + (root / "nested/Foo.32.43.dsdl").write_text("# TEST C") + + dsdl_defs = _construct_dsdl_definitions_from_namespace(root) + print(dsdl_defs) + lut = {x.full_name: x for x in dsdl_defs} # type: Dict[str, _dsdl_definition.DSDLDefinition] + assert len(lut) == 3 + + assert str(lut["foo.Qwerty"]) == repr(lut["foo.Qwerty"]) + assert ( + str(lut["foo.Qwerty"]) + == "DSDLDefinition(full_name='foo.Qwerty', version=Version(major=123, minor=234), fixed_port_id=123, " + "file_path=%s)" % lut["foo.Qwerty"].file_path + ) - assert ( - str(lut["foo.nested.Foo"]) - == "DSDLDefinition(full_name='foo.nested.Foo', version=Version(major=32, minor=43), fixed_port_id=None, " - "file_path=%s)" % lut["foo.nested.Foo"].file_path - ) + assert ( + str(lut["foo.nested.Foo"]) + == "DSDLDefinition(full_name='foo.nested.Foo', version=Version(major=32, minor=43), fixed_port_id=None, " + "file_path=%s)" % lut["foo.nested.Foo"].file_path + ) - t = lut["foo.Qwerty"] - assert t.file_path == root_ns_dir / "123.Qwerty.123.234.dsdl" - assert t.has_fixed_port_id - assert t.fixed_port_id == 123 - assert t.text == "# TEST TEXT" - assert t.version.major == 123 - assert t.version.minor == 234 - assert t.name_components == ["foo", "Qwerty"] - assert t.short_name == "Qwerty" - assert t.root_namespace == "foo" - assert t.full_namespace == "foo" - - t = lut["foo.nested.Asd"] - assert t.file_path == root_ns_dir / "nested" / "2.Asd.21.32.dsdl" - assert t.has_fixed_port_id - assert t.fixed_port_id == 2 - assert t.text == "# TEST TEXT" - assert t.version.major == 21 - assert t.version.minor == 32 - assert t.name_components == ["foo", "nested", "Asd"] - assert t.short_name == "Asd" - assert t.root_namespace == "foo" - assert t.full_namespace == "foo.nested" - - t = lut["foo.nested.Foo"] - assert t.file_path == root_ns_dir / "nested" / "Foo.32.43.dsdl" - assert not t.has_fixed_port_id - assert t.fixed_port_id is None - assert t.text == "# TEST TEXT" - assert t.version.major == 32 - assert t.version.minor == 43 - assert t.name_components == ["foo", "nested", "Foo"] - assert t.short_name == "Foo" - assert t.root_namespace == "foo" - assert t.full_namespace == "foo.nested" - - touchy("nested/Malformed.MAJOR.MINOR.dsdl") - try: - _construct_dsdl_definitions_from_namespace(root_ns_dir) - except FileNameFormatError as ex: - print(ex) - discard("nested/Malformed.MAJOR.MINOR.dsdl") - else: # pragma: no cover - assert False - - touchy("nested/NOT_A_NUMBER.Malformed.1.0.dsdl") - try: - _construct_dsdl_definitions_from_namespace(root_ns_dir) - except FileNameFormatError as ex: - print(ex) - discard("nested/NOT_A_NUMBER.Malformed.1.0.dsdl") - else: # pragma: no cover - assert False - - touchy("nested/Malformed.dsdl") - try: - _construct_dsdl_definitions_from_namespace(root_ns_dir) - except FileNameFormatError as ex: - print(ex) - discard("nested/Malformed.dsdl") - else: # pragma: no cover - assert False - - _construct_dsdl_definitions_from_namespace(root_ns_dir) # making sure all errors are cleared - - touchy("nested/super.bad/Unreachable.1.0.dsdl") - try: - _construct_dsdl_definitions_from_namespace(root_ns_dir) - except FileNameFormatError as ex: - print(ex) - else: # pragma: no cover - assert False - - try: - _construct_dsdl_definitions_from_namespace(root_ns_dir / "nested/super.bad") - except FileNameFormatError as ex: - print(ex) - else: # pragma: no cover - assert False - - discard("nested/super.bad/Unreachable.1.0.dsdl") + t = lut["foo.Qwerty"] + assert t.file_path == root / "123.Qwerty.123.234.dsdl" + assert t.has_fixed_port_id + assert t.fixed_port_id == 123 + assert t.text == "# TEST A" + assert t.version.major == 123 + assert t.version.minor == 234 + assert t.name_components == ["foo", "Qwerty"] + assert t.short_name == "Qwerty" + assert t.root_namespace == "foo" + assert t.full_namespace == "foo" + + t = lut["foo.nested.Asd"] + assert t.file_path == root / "nested" / "2.Asd.21.32.dsdl" + assert t.has_fixed_port_id + assert t.fixed_port_id == 2 + assert t.text == "# TEST B" + assert t.version.major == 21 + assert t.version.minor == 32 + assert t.name_components == ["foo", "nested", "Asd"] + assert t.short_name == "Asd" + assert t.root_namespace == "foo" + assert t.full_namespace == "foo.nested" + + t = lut["foo.nested.Foo"] + assert t.file_path == root / "nested" / "Foo.32.43.dsdl" + assert not t.has_fixed_port_id + assert t.fixed_port_id is None + assert t.text == "# TEST C" + assert t.version.major == 32 + assert t.version.minor == 43 + assert t.name_components == ["foo", "nested", "Foo"] + assert t.short_name == "Foo" + assert t.root_namespace == "foo" + assert t.full_namespace == "foo.nested" + + (root / "nested/Malformed.MAJOR.MINOR.dsdl").touch() + try: + _construct_dsdl_definitions_from_namespace(root) + except FileNameFormatError as ex: + print(ex) + (root / "nested/Malformed.MAJOR.MINOR.dsdl").unlink() + else: # pragma: no cover + assert False + + (root / "nested/NOT_A_NUMBER.Malformed.1.0.dsdl").touch() + try: + _construct_dsdl_definitions_from_namespace(root) + except FileNameFormatError as ex: + print(ex) + (root / "nested/NOT_A_NUMBER.Malformed.1.0.dsdl").unlink() + else: # pragma: no cover + assert False + + (root / "nested/Malformed.dsdl").touch() + try: + _construct_dsdl_definitions_from_namespace(root) + except FileNameFormatError as ex: + print(ex) + (root / "nested/Malformed.dsdl").unlink() + else: # pragma: no cover + assert False + + _construct_dsdl_definitions_from_namespace(root) # making sure all errors are cleared + + (root / "nested/super.bad").mkdir() + (root / "nested/super.bad/Unreachable.1.0.dsdl").touch() + try: + _construct_dsdl_definitions_from_namespace(root) + except FileNameFormatError as ex: + print(ex) + else: # pragma: no cover + assert False + + try: + _construct_dsdl_definitions_from_namespace(root / "nested/super.bad") + except FileNameFormatError as ex: + print(ex) + else: # pragma: no cover + assert False + + (root / "nested/super.bad/Unreachable.1.0.dsdl").unlink() + + +def _unittest_dsdl_definition_constructor_legacy() -> None: + import tempfile + + with tempfile.TemporaryDirectory() as directory: + di = Path(directory).resolve() + root = di / "foo" + root.mkdir() + (root / "123.Qwerty.123.234.uavcan").write_text("# TEST A") + dsdl_defs = _construct_dsdl_definitions_from_namespace(root) + print(dsdl_defs) + lut = {x.full_name: x for x in dsdl_defs} # type: Dict[str, _dsdl_definition.DSDLDefinition] + assert len(lut) == 1 + t = lut["foo.Qwerty"] + assert t.file_path == root / "123.Qwerty.123.234.uavcan" + assert t.has_fixed_port_id + assert t.fixed_port_id == 123 + assert t.text == "# TEST A" + assert t.version.major == 123 + assert t.version.minor == 234 + assert t.name_components == ["foo", "Qwerty"] + assert t.short_name == "Qwerty" + assert t.root_namespace == "foo" + assert t.full_namespace == "foo" def _unittest_common_usage_errors() -> None: import tempfile - directory = tempfile.TemporaryDirectory() # pylint: disable=consider-using-with - root_ns_dir = Path(os.path.join(directory.name, "foo")) - os.mkdir(root_ns_dir) + with tempfile.TemporaryDirectory() as directory: + di = Path(directory) + root_ns_dir = di / "foo" + root_ns_dir.mkdir() - reports = [] # type: List[str] + reports = [] # type: List[str] - _ensure_no_common_usage_errors(root_ns_dir, [], reports.append) - assert not reports - _ensure_no_common_usage_errors(root_ns_dir, [Path("/baz")], reports.append) - assert not reports + _ensure_no_common_usage_errors(root_ns_dir, [], reports.append) + assert not reports + _ensure_no_common_usage_errors(root_ns_dir, [di / "baz"], reports.append) + assert not reports - dir_dsdl = root_ns_dir / "dsdl" - os.mkdir(dir_dsdl) - _ensure_no_common_usage_errors(dir_dsdl, [Path("/baz")], reports.append) - assert not reports # Because empty. + dir_dsdl = root_ns_dir / "dsdl" + dir_dsdl.mkdir() + _ensure_no_common_usage_errors(dir_dsdl, [di / "baz"], reports.append) + assert not reports # Because empty. - dir_dsdl_vscode = os.path.join(dir_dsdl, ".vscode") - os.mkdir(dir_dsdl_vscode) - _ensure_no_common_usage_errors(dir_dsdl, [Path("/baz")], reports.append) - assert not reports # Because the name is not valid. + dir_dsdl_vscode = dir_dsdl / ".vscode" + dir_dsdl_vscode.mkdir() + _ensure_no_common_usage_errors(dir_dsdl, [di / "baz"], reports.append) + assert not reports # Because the name is not valid. - dir_dsdl_uavcan = os.path.join(dir_dsdl, "uavcan") - os.mkdir(dir_dsdl_uavcan) - _ensure_no_common_usage_errors(dir_dsdl, [Path("/baz")], reports.append) - (rep,) = reports - reports.clear() - assert os.path.normcase(dir_dsdl_uavcan) in rep + dir_dsdl_uavcan = dir_dsdl / "uavcan" + dir_dsdl_uavcan.mkdir() + _ensure_no_common_usage_errors(dir_dsdl, [di / "baz"], reports.append) + (rep,) = reports + reports.clear() + assert str(dir_dsdl_uavcan).lower() in rep.lower() def _unittest_nested_roots() -> None: from pytest import raises + import tempfile - _ensure_no_nested_root_namespaces([]) - _ensure_no_nested_root_namespaces([Path("a")]) - _ensure_no_nested_root_namespaces([Path("a/b"), Path("a/c")]) - with raises(NestedRootNamespaceError): - _ensure_no_nested_root_namespaces([Path("a/b"), Path("a")]) - _ensure_no_nested_root_namespaces([Path("aa/b"), Path("a")]) - _ensure_no_nested_root_namespaces([Path("a/b"), Path("aa")]) + with tempfile.TemporaryDirectory() as directory: + di = Path(directory) + (di / "a").mkdir() + (di / "aa").mkdir() + (di / "a/b").mkdir() + (di / "a/c").mkdir() + (di / "aa/b").mkdir() + _ensure_no_nested_root_namespaces([]) + _ensure_no_nested_root_namespaces([di / "a"]) + _ensure_no_nested_root_namespaces([di / "a/b", di / "a/c"]) + with raises(NestedRootNamespaceError): + _ensure_no_nested_root_namespaces([di / "a/b", di / "a"]) + _ensure_no_nested_root_namespaces([di / "aa/b", di / "a"]) + _ensure_no_nested_root_namespaces([di / "a/b", di / "aa"]) def _unittest_issue_71() -> None: # https://github.com/OpenCyphal/pydsdl/issues/71 diff --git a/pydsdl/_parser.py b/pydsdl/_parser.py index 6c8e508..b73a533 100644 --- a/pydsdl/_parser.py +++ b/pydsdl/_parser.py @@ -2,12 +2,12 @@ # This software is distributed under the terms of the MIT License. # Author: Pavel Kirienko -import os import typing import logging import itertools import functools import fractions +from pathlib import Path from typing import List, Tuple import parsimonious from parsimonious.nodes import Node as _Node @@ -89,8 +89,7 @@ def resolve_versioned_data_type(self, name: str, version: _serializable.Version) @functools.lru_cache(None) def _get_grammar() -> parsimonious.Grammar: - with open(os.path.join(os.path.dirname(__file__), "grammar.parsimonious")) as _grammar_file: - return parsimonious.Grammar(_grammar_file.read()) # type: ignore + return parsimonious.Grammar((Path(__file__).parent / "grammar.parsimonious").read_text()) # type: ignore _logger = logging.getLogger(__name__) diff --git a/pydsdl/_test.py b/pydsdl/_test.py index 9c4babf..f81f735 100644 --- a/pydsdl/_test.py +++ b/pydsdl/_test.py @@ -2,14 +2,13 @@ # This software is distributed under the terms of the MIT License. # Author: Pavel Kirienko -# pylint: disable=global-statement,protected-access,too-many-statements,consider-using-with +# pylint: disable=global-statement,protected-access,too-many-statements,consider-using-with,redefined-outer-name -import os -import typing import tempfile -from typing import Union, Tuple, Optional +from typing import Union, Tuple, Optional, Sequence, Type, Iterable from pathlib import Path from textwrap import dedent +import pytest # This is only safe to import in test files! from . import _expression from . import _error from . import _parser @@ -18,15 +17,48 @@ from . import _serializable from . import _namespace -# Type annotation disabled here because MyPy is misbehaving, reporting these nonsensical error messages: -# pydsdl/_test.py:18: error: Missing type parameters for generic type -# pydsdl/_test.py: note: In function "_in_n_out": -# pydsdl/_test.py:18: error: Missing type parameters for generic type -_DIRECTORY = None # type : typing.Optional[tempfile.TemporaryDirectory] +__all__ = [] # type: ignore -def _parse_definition( - definition: _dsdl_definition.DSDLDefinition, lookup_definitions: typing.Sequence[_dsdl_definition.DSDLDefinition] +class Workspace: + def __init__(self) -> None: + self._tmp_dir = tempfile.TemporaryDirectory(prefix="pydsdl-test-") + + @property + def directory(self) -> Path: + return Path(self._tmp_dir.name) + + def new(self, rel_path: Union[str, Path], text: str) -> None: + """ + Simply creates a new DSDL source file with the given contents at the specified path inside the workspace. + """ + rel_path = Path(rel_path) + path = self.directory / rel_path + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(text, encoding="utf8") + + def parse_new(self, rel_path: Union[str, Path], text: str) -> _dsdl_definition.DSDLDefinition: + """ + Creates a new DSDL source file with the given contents at the specified path inside the workspace, + then parses it and returns the resulting definition object. + """ + rel_path = Path(rel_path) + self.new(rel_path, text) + path = self.directory / rel_path + root_namespace_path = self.directory / rel_path.parts[0] + out = _dsdl_definition.DSDLDefinition(path, root_namespace_path) + return out + + def drop(self, rel_path_glob: str) -> None: + """ + Deletes all files matching the specified glob pattern. + """ + for g in self.directory.glob(rel_path_glob): + g.unlink() + + +def parse_definition( + definition: _dsdl_definition.DSDLDefinition, lookup_definitions: Sequence[_dsdl_definition.DSDLDefinition] ) -> _serializable.CompositeType: return definition.read( lookup_definitions, @@ -35,57 +67,31 @@ def _parse_definition( ) -def _define(rel_path: Union[str, Path], text: str) -> _dsdl_definition.DSDLDefinition: - rel_path = str(rel_path).replace("/", os.sep) # Windows compatibility - assert _DIRECTORY - path = Path(_DIRECTORY.name, rel_path) - os.makedirs(os.path.dirname(path), exist_ok=True) - with open(path, "w", encoding="utf8") as f: - f.write(text) - - root_namespace_path = Path(_DIRECTORY.name, rel_path.strip(os.sep).split(os.sep, maxsplit=1)[0]) - out = _dsdl_definition.DSDLDefinition(path, root_namespace_path) - print("New definition:", out, "Root NS:", root_namespace_path) - return out - - -def _in_n_out(test: typing.Callable[[], None]) -> typing.Callable[[], None]: - def decorator() -> None: - global _DIRECTORY - _DIRECTORY = tempfile.TemporaryDirectory(prefix="pydsdl-test-") - try: - test() - finally: - _DIRECTORY = None # Preserving the contents for future inspection if needed - - return decorator +@pytest.fixture() # type: ignore +def wrkspc() -> Workspace: + return Workspace() -@_in_n_out -def _unittest_define() -> None: - # I DON'T ALWAYS WRITE UNIT TESTS - d = _define("uavcan/test/5000.Message.1.2.dsdl", "# empty") - assert _DIRECTORY is not None +def _unittest_define(wrkspc: Workspace) -> None: + d = wrkspc.parse_new("uavcan/test/5000.Message.1.2.dsdl", "# empty") assert d.full_name == "uavcan.test.Message" assert d.version == (1, 2) assert d.fixed_port_id == 5000 - assert d.file_path.samefile(Path(_DIRECTORY.name, "uavcan", "test", "5000.Message.1.2.dsdl")) - assert d.root_namespace_path.samefile(Path(_DIRECTORY.name, "uavcan")) - assert open(d.file_path).read() == "# empty" + assert d.file_path.samefile(Path(wrkspc.directory, "uavcan", "test", "5000.Message.1.2.dsdl")) + assert d.root_namespace_path.samefile(wrkspc.directory / "uavcan") + assert d.file_path.read_text() == "# empty" - # BUT WHEN I DO, I WRITE UNIT TESTS FOR MY UNIT TESTS - d = _define("uavcan/Service.255.254.dsdl", "# empty 2") + d = wrkspc.parse_new("uavcan/Service.255.254.dsdl", "# empty 2") assert d.full_name == "uavcan.Service" assert d.version == (255, 254) assert d.fixed_port_id is None - assert d.file_path.samefile(Path(_DIRECTORY.name, "uavcan", "Service.255.254.dsdl")) - assert d.root_namespace_path.samefile(Path(_DIRECTORY.name, "uavcan")) - assert open(d.file_path).read() == "# empty 2" + assert d.file_path.samefile(Path(wrkspc.directory, "uavcan", "Service.255.254.dsdl")) + assert d.root_namespace_path.samefile(wrkspc.directory / "uavcan") + assert d.file_path.read_text() == "# empty 2" -@_in_n_out -def _unittest_simple() -> None: - abc = _define( +def _unittest_simple(wrkspc: Workspace) -> None: + abc = wrkspc.parse_new( "vendor/nested/7000.Abc.1.2.dsdl", dedent( """ @@ -101,13 +107,13 @@ def _unittest_simple() -> None: assert abc.full_name == "vendor.nested.Abc" assert abc.version == (1, 2) - p = _parse_definition(abc, []) + p = parse_definition(abc, []) print("Parsed:", p) assert isinstance(p, _serializable.DelimitedType) assert isinstance(p.inner_type, _serializable.StructureType) assert p.full_name == "vendor.nested.Abc" - assert str(p.source_file_path).endswith(os.path.join("vendor", "nested", "7000.Abc.1.2.dsdl")) - assert p.source_file_path == abc.file_path + assert p.source_file_path.parts[-3:] == ("vendor", "nested", "7000.Abc.1.2.dsdl") + assert p.source_file_path.samefile(abc.file_path) assert p.fixed_port_id == 7000 assert p.deprecated assert p.version == (1, 2) @@ -131,11 +137,11 @@ def _unittest_simple() -> None: assert isinstance(t, _serializable.ArrayType) assert str(t.element_type) == "saturated int64" - empty_new = _define("vendor/nested/Empty.255.255.dsdl", """@sealed""") + empty_new = wrkspc.parse_new("vendor/nested/Empty.255.255.dsdl", """@sealed""") - empty_old = _define("vendor/nested/Empty.255.254.dsdl", """@sealed""") + empty_old = wrkspc.parse_new("vendor/nested/Empty.255.254.dsdl", """@sealed""") - constants = _define( + constants = wrkspc.parse_new( "another/Constants.5.0.dsdl", dedent( """ @@ -145,7 +151,7 @@ def _unittest_simple() -> None: ), ) - service = _define( + service = wrkspc.parse_new( "another/300.Service.0.1.dsdl", dedent( """ @@ -163,7 +169,7 @@ def _unittest_simple() -> None: ), ) - p = _parse_definition( + p = parse_definition( service, [ abc, @@ -243,7 +249,7 @@ def _unittest_simple() -> None: assert t.full_name == "vendor.nested.Abc" assert t.version == (1, 2) - p2 = _parse_definition( + p2 = parse_definition( abc, [ service, @@ -256,7 +262,7 @@ def _unittest_simple() -> None: assert hash(p2) != hash(p) assert hash(p) == hash(p) - union = _define( + union = wrkspc.parse_new( "another/Union.5.9.dsdl", dedent( """ @@ -270,7 +276,7 @@ def _unittest_simple() -> None: ), ) - p = _parse_definition( + p = parse_definition( union, [ empty_old, @@ -296,9 +302,8 @@ def _unittest_simple() -> None: assert str(p.fields[2]) == "saturated bool[<=255] c" -@_in_n_out -def _unittest_comments() -> None: - abc = _define( +def _unittest_comments(wrkspc: Workspace) -> None: + abc = wrkspc.parse_new( "vendor/nested/7000.Abc.1.2.dsdl", dedent( """\ @@ -320,7 +325,7 @@ def _unittest_comments() -> None: ), ) - p = _parse_definition(abc, []) + p = parse_definition(abc, []) print("Parsed:", p) print(p.doc.__repr__()) # assert p.doc == "header comment here\nmultiline" @@ -329,11 +334,11 @@ def _unittest_comments() -> None: assert p.fields[2].doc == "comment on padding field" assert p.fields[3].doc == "comment on array\nand another" - empty_new = _define("vendor/nested/Empty.255.255.dsdl", """@sealed""") + empty_new = wrkspc.parse_new("vendor/nested/Empty.255.255.dsdl", """@sealed""") - empty_old = _define("vendor/nested/Empty.255.254.dsdl", """@sealed""") + empty_old = wrkspc.parse_new("vendor/nested/Empty.255.254.dsdl", """@sealed""") - constants = _define( + constants = wrkspc.parse_new( "another/Constants.5.0.dsdl", dedent( """ @@ -343,11 +348,11 @@ def _unittest_comments() -> None: ), ) - p = _parse_definition(constants, []) + p = parse_definition(constants, []) assert p.doc == "" assert p.constants[0].doc == "no header comment" - service = _define( + service = wrkspc.parse_new( "another/300.Service.0.1.dsdl", dedent( """\ @@ -369,7 +374,7 @@ def _unittest_comments() -> None: ), ) - p = _parse_definition( + p = parse_definition( service, [ abc, @@ -383,7 +388,7 @@ def _unittest_comments() -> None: assert req.doc == "first header comment here\nmultiline" # type: ignore assert res.doc == "second header comment here\nmultiline" # type: ignore - union = _define( + union = wrkspc.parse_new( "another/Union.5.9.dsdl", dedent( """ @@ -398,7 +403,7 @@ def _unittest_comments() -> None: ), ) - p = _parse_definition( + p = parse_definition( union, [ empty_old, @@ -410,12 +415,15 @@ def _unittest_comments() -> None: # noinspection PyProtectedMember,PyProtectedMember -@_in_n_out -def _unittest_error() -> None: + + +def _unittest_error(wrkspc: Workspace) -> None: from pytest import raises def standalone(rel_path: str, definition: str, allow_unregulated: bool = False) -> _serializable.CompositeType: - return _define(rel_path, definition + "\n").read([], lambda *_: None, allow_unregulated) # pragma: no branch + return wrkspc.parse_new(rel_path, definition + "\n").read( + [], lambda *_: None, allow_unregulated + ) # pragma: no branch with raises(_error.InvalidDefinitionError, match="(?i).*port ID.*"): standalone("vendor/1000.InvalidRegulatedSubjectID.1.0.dsdl", "uint2 value\n@sealed") @@ -559,59 +567,61 @@ def standalone(rel_path: str, definition: str, allow_unregulated: bool = False) ) with raises(_data_type_builder.UndefinedDataTypeError, match=r".*ns.Type_.*1\.0"): - _parse_definition( - _define("vendor/types/A.1.0.dsdl", "ns.Type_.1.0 field\n@sealed"), + parse_definition( + wrkspc.parse_new("vendor/types/A.1.0.dsdl", "ns.Type_.1.0 field\n@sealed"), [ - _define("ns/Type_.2.0.dsdl", "@sealed"), + wrkspc.parse_new("ns/Type_.2.0.dsdl", "@sealed"), ], ) with raises(_error.InvalidDefinitionError, match="(?i).*Bit length cannot exceed.*"): - _parse_definition( - _define("vendor/types/A.1.0.dsdl", "int128 field\n@sealed"), + parse_definition( + wrkspc.parse_new("vendor/types/A.1.0.dsdl", "int128 field\n@sealed"), [ - _define("ns/Type_.2.0.dsdl", "@sealed"), - _define("ns/Type_.1.1.dsdl", "@sealed"), + wrkspc.parse_new("ns/Type_.2.0.dsdl", "@sealed"), + wrkspc.parse_new("ns/Type_.1.1.dsdl", "@sealed"), ], ) with raises(_error.InvalidDefinitionError, match="(?i).*type.*"): - _parse_definition( - _define("vendor/invalid_constant_value/A.1.0.dsdl", "ns.Type_.1.1 VALUE = 123\n@sealed"), + parse_definition( + wrkspc.parse_new("vendor/invalid_constant_value/A.1.0.dsdl", "ns.Type_.1.1 VALUE = 123\n@sealed"), [ - _define("ns/Type_.2.0.dsdl", "@sealed"), - _define("ns/Type_.1.1.dsdl", "@sealed"), + wrkspc.parse_new("ns/Type_.2.0.dsdl", "@sealed"), + wrkspc.parse_new("ns/Type_.1.1.dsdl", "@sealed"), ], ) with raises(_data_type_builder.UndefinedDataTypeError): defs = [ - _define("vendor/circular_dependency/A.1.0.dsdl", "B.1.0 b\n@sealed"), - _define("vendor/circular_dependency/B.1.0.dsdl", "A.1.0 b\n@sealed"), + wrkspc.parse_new("vendor/circular_dependency/A.1.0.dsdl", "B.1.0 b\n@sealed"), + wrkspc.parse_new("vendor/circular_dependency/B.1.0.dsdl", "A.1.0 b\n@sealed"), ] - _parse_definition(defs[0], defs) + parse_definition(defs[0], defs) with raises(_error.InvalidDefinitionError, match="(?i).*union directive.*"): - _parse_definition( - _define("vendor/misplaced_directive/A.1.0.dsdl", "ns.Type_.2.0 field\n@union\n@sealed"), + parse_definition( + wrkspc.parse_new("vendor/misplaced_directive/A.1.0.dsdl", "ns.Type_.2.0 field\n@union\n@sealed"), [ - _define("ns/Type_.2.0.dsdl", "@sealed"), + wrkspc.parse_new("ns/Type_.2.0.dsdl", "@sealed"), ], ) with raises(_error.InvalidDefinitionError, match="(?i).*deprecated directive.*"): - _parse_definition( - _define("vendor/misplaced_directive/A.1.0.dsdl", "ns.Type_.2.0 field\n@deprecated\n@sealed"), + parse_definition( + wrkspc.parse_new("vendor/misplaced_directive/A.1.0.dsdl", "ns.Type_.2.0 field\n@deprecated\n@sealed"), [ - _define("ns/Type_.2.0.dsdl", "@sealed"), + wrkspc.parse_new("ns/Type_.2.0.dsdl", "@sealed"), ], ) with raises(_error.InvalidDefinitionError, match="(?i).*deprecated directive.*"): - _parse_definition( - _define("vendor/misplaced_directive/A.1.0.dsdl", "ns.Type_.2.0 field\n@sealed\n---\n@deprecated\n@sealed"), + parse_definition( + wrkspc.parse_new( + "vendor/misplaced_directive/A.1.0.dsdl", "ns.Type_.2.0 field\n@sealed\n---\n@deprecated\n@sealed" + ), [ - _define("ns/Type_.2.0.dsdl", "@sealed"), + wrkspc.parse_new("ns/Type_.2.0.dsdl", "@sealed"), ], ) @@ -629,7 +639,7 @@ def standalone(rel_path: str, definition: str, allow_unregulated: bool = False) ), ) except _error.FrontendError as ex: - assert ex.path and str(ex.path).endswith(os.path.join("vendor", "types", "A.1.0.dsdl")) + assert ex.path and ex.path.parts[-3:] == ("vendor", "types", "A.1.0.dsdl") assert ex.line and ex.line == 4 else: # pragma: no cover assert False @@ -734,15 +744,14 @@ def standalone(rel_path: str, definition: str, allow_unregulated: bool = False) ) -@_in_n_out -def _unittest_print() -> None: +def _unittest_print(wrkspc: Workspace) -> None: printed_items = None # type: Optional[Tuple[int, str]] def print_handler(line_number: int, text: str) -> None: nonlocal printed_items printed_items = line_number, text - _define( + wrkspc.parse_new( "ns/A.1.0.dsdl", "# line number 1\n" "# line number 2\n" "@print 2 + 2 == 4 # line number 3\n" "# line number 4\n" "@sealed\n", ).read([], print_handler, False) @@ -751,12 +760,12 @@ def print_handler(line_number: int, text: str) -> None: assert printed_items[0] == 3 assert printed_items[1] == "true" - _define("ns/B.1.0.dsdl", "@print false\n@sealed").read([], print_handler, False) + wrkspc.parse_new("ns/B.1.0.dsdl", "@print false\n@sealed").read([], print_handler, False) assert printed_items assert printed_items[0] == 1 assert printed_items[1] == "false" - _define( + wrkspc.parse_new( "ns/Offset.1.0.dsdl", "@print _offset_ # Not recorded\n" "uint8 a\n" "@print _offset_\n" "@extent 800\n" ).read([], print_handler, False) assert printed_items @@ -765,12 +774,13 @@ def print_handler(line_number: int, text: str) -> None: # noinspection PyProtectedMember -@_in_n_out -def _unittest_assert() -> None: + + +def _unittest_assert(wrkspc: Workspace) -> None: from pytest import raises - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/A.1.0.dsdl", dedent( """ @@ -803,12 +813,12 @@ def _unittest_assert() -> None: """ ), ), - [_define("ns/Array.1.0.dsdl", "uint8[<=2] foo\n@sealed")], + [wrkspc.parse_new("ns/Array.1.0.dsdl", "uint8[<=2] foo\n@sealed")], ) with raises(_error.InvalidDefinitionError, match="(?i).*operator is not defined.*"): - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/C.1.0.dsdl", dedent( """ @@ -822,25 +832,27 @@ def _unittest_assert() -> None: ) with raises(_expression.UndefinedAttributeError): - _parse_definition( - _define("ns/C.1.0.dsdl", "@print Service.1.0._bit_length_"), - [_define("ns/Service.1.0.dsdl", "uint8 a\n@sealed\n---\nuint16 b\n@sealed")], + parse_definition( + wrkspc.parse_new("ns/C.1.0.dsdl", "@print Service.1.0._bit_length_"), + [wrkspc.parse_new("ns/Service.1.0.dsdl", "uint8 a\n@sealed\n---\nuint16 b\n@sealed")], ) with raises(_expression.UndefinedAttributeError): - _parse_definition(_define("ns/C.1.0.dsdl", """uint64 LENGTH = uint64.nonexistent_attribute\n@extent 0"""), []) + parse_definition( + wrkspc.parse_new("ns/C.1.0.dsdl", """uint64 LENGTH = uint64.nonexistent_attribute\n@extent 0"""), [] + ) with raises(_error.InvalidDefinitionError, match="(?i).*void.*"): - _parse_definition(_define("ns/C.1.0.dsdl", "void2 name\n@sealed"), []) + parse_definition(wrkspc.parse_new("ns/C.1.0.dsdl", "void2 name\n@sealed"), []) with raises(_serializable._attribute.InvalidConstantValueError): - _parse_definition(_define("ns/C.1.0.dsdl", "int8 name = true\n@sealed"), []) + parse_definition(wrkspc.parse_new("ns/C.1.0.dsdl", "int8 name = true\n@sealed"), []) with raises(_error.InvalidDefinitionError, match=".*value.*"): - _parse_definition(_define("ns/C.1.0.dsdl", "int8 name = {1, 2, 3}\n@sealed"), []) + parse_definition(wrkspc.parse_new("ns/C.1.0.dsdl", "int8 name = {1, 2, 3}\n@sealed"), []) - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/D.1.0.dsdl", dedent( """ @@ -855,8 +867,8 @@ def _unittest_assert() -> None: [], ) - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/E.1.0.dsdl", dedent( """ @@ -876,8 +888,8 @@ def _unittest_assert() -> None: ) with raises(_error.InvalidDefinitionError): - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/F.1.0.dsdl", dedent( """ @@ -894,8 +906,8 @@ def _unittest_assert() -> None: ) with raises(_data_type_builder.AssertionCheckFailureError): - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/G.1.0.dsdl", dedent( """ @@ -909,8 +921,8 @@ def _unittest_assert() -> None: ) with raises(_error.InvalidDefinitionError, match="(?i).*yield a boolean.*"): - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/H.1.0.dsdl", dedent( """ @@ -924,8 +936,8 @@ def _unittest_assert() -> None: ) # Extent verification - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/I.1.0.dsdl", dedent( """ @@ -938,14 +950,14 @@ def _unittest_assert() -> None: ), ), [ - _define("ns/J.1.0.dsdl", "uint8 foo\n@extent 64"), - _define("ns/K.1.0.dsdl", "uint8 foo\n@sealed"), + wrkspc.parse_new("ns/J.1.0.dsdl", "uint8 foo\n@extent 64"), + wrkspc.parse_new("ns/K.1.0.dsdl", "uint8 foo\n@sealed"), ], ) # Alignment - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/L.1.0.dsdl", dedent( """ @@ -968,34 +980,25 @@ def _unittest_assert() -> None: ), ), [ - _define("ns/M.1.0.dsdl", "@extent 16"), - _define("ns/N.1.0.dsdl", "@sealed"), + wrkspc.parse_new("ns/M.1.0.dsdl", "@extent 16"), + wrkspc.parse_new("ns/N.1.0.dsdl", "@sealed"), ], ) -def _unittest_parse_namespace() -> None: +def _unittest_parse_namespace(wrkspc: Workspace) -> None: from pytest import raises - directory = tempfile.TemporaryDirectory() - print_output = None # type: Optional[Tuple[str, int, str]] def print_handler(d: Path, line: int, text: str) -> None: nonlocal print_output print_output = str(d), line, text - # noinspection PyShadowingNames - def _define(rel_path: str, text: str) -> None: - path = Path(directory.name, rel_path) - os.makedirs(os.path.dirname(path), exist_ok=True) - with open(path, "w") as f: - f.write(text) - # Empty namespace. - assert [] == _namespace.read_namespace(directory.name) + assert [] == _namespace.read_namespace(wrkspc.directory) - _define( + wrkspc.new( "zubax/First.1.0.dsdl", dedent( """ @@ -1007,7 +1010,7 @@ def _define(rel_path: str, text: str) -> None: ), ) - _define( + wrkspc.new( "zubax/7001.Message.1.0.dsdl", dedent( """ @@ -1019,7 +1022,7 @@ def _define(rel_path: str, text: str) -> None: ), ) - _define( + wrkspc.new( "zubax/nested/300.Spartans.30.0.dsdl", dedent( """ @@ -1036,12 +1039,12 @@ def _define(rel_path: str, text: str) -> None: ), ) - _define("zubax/nested/300.Spartans.30.0.txt", "completely unrelated stuff") - _define("zubax/300.Spartans.30.0", "completely unrelated stuff") + wrkspc.new("zubax/nested/300.Spartans.30.0.txt", "completely unrelated stuff") + wrkspc.new("zubax/300.Spartans.30.0", "completely unrelated stuff") parsed = _namespace.read_namespace( - Path(directory.name, "zubax"), - [Path(directory.name, "zubax", ".")], # Intentional duplicate + wrkspc.directory / "zubax", + [Path(wrkspc.directory, "zubax", ".")], # Intentional duplicate print_handler, ) print(parsed) @@ -1051,10 +1054,10 @@ def _define(rel_path: str, text: str) -> None: assert "zubax.nested.Spartans" in [x.full_name for x in parsed] # try again with minimal arguments to read_namespace - parsed_minimal_args = _namespace.read_namespace(Path(directory.name, "zubax")) + parsed_minimal_args = _namespace.read_namespace(wrkspc.directory / "zubax") assert len(parsed_minimal_args) == 3 - _define( + wrkspc.new( "zubax/colliding/300.Iceberg.30.0.dsdl", dedent( """ @@ -1066,21 +1069,21 @@ def _define(rel_path: str, text: str) -> None: ) with raises(_namespace.FixedPortIDCollisionError): - _namespace.read_namespace(Path(directory.name, "zubax"), [], print_handler) + _namespace.read_namespace(wrkspc.directory / "zubax", [], print_handler) with raises(TypeError): # Invalid usage: expected path-like object, not bytes. - _namespace.read_namespace(Path(directory.name, "zubax"), b"/my/path") # type: ignore + _namespace.read_namespace(wrkspc.directory / "zubax", b"/my/path") # type: ignore with raises(TypeError): # Invalid usage: expected path-like object, not bytes. # noinspection PyTypeChecker - _namespace.read_namespace(Path(directory.name, "zubax"), [b"/my/path"]) # type: ignore + _namespace.read_namespace(wrkspc.directory / "zubax", [b"/my/path"]) # type: ignore assert print_output is not None assert "300.Spartans" in print_output[0] assert print_output[1] == 9 assert print_output[2] == "{0}" - _define( + wrkspc.new( "zubax/colliding/iceberg/300.Ice.30.0.dsdl", dedent( """ @@ -1092,19 +1095,19 @@ def _define(rel_path: str, text: str) -> None: ) with raises(_namespace.DataTypeNameCollisionError): _namespace.read_namespace( - Path(directory.name, "zubax"), + wrkspc.directory / "zubax", [ - Path(directory.name, "zubax"), + wrkspc.directory / "zubax", ], ) # Do again to test single lookup-directory override with raises(_namespace.DataTypeNameCollisionError): - _namespace.read_namespace(Path(directory.name, "zubax"), Path(directory.name, "zubax")) + _namespace.read_namespace(wrkspc.directory / "zubax", wrkspc.directory / "zubax") try: - os.unlink(Path(directory.name, "zubax/colliding/iceberg/300.Ice.30.0.dsdl")) - _define( + (wrkspc.directory / "zubax/colliding/iceberg/300.Ice.30.0.dsdl").unlink() + wrkspc.new( "zubax/COLLIDING/300.Iceberg.30.0.dsdl", dedent( """ @@ -1116,21 +1119,21 @@ def _define(rel_path: str, text: str) -> None: ) with raises(_namespace.DataTypeNameCollisionError, match=".*letter case.*"): _namespace.read_namespace( - Path(directory.name, "zubax"), + wrkspc.directory / "zubax", [ - Path(directory.name, "zubax"), + wrkspc.directory / "zubax", ], ) except _namespace.FixedPortIDCollisionError: # pragma: no cover pass # We're running on a platform where paths are not case-sensitive. - # Test namespece can intersect with type name - os.unlink(Path(directory.name, "zubax/COLLIDING/300.Iceberg.30.0.dsdl")) + # Test namespace can intersect with type name + (wrkspc.directory / "zubax/COLLIDING/300.Iceberg.30.0.dsdl").unlink() try: - os.unlink(Path(directory.name, "zubax/colliding/300.Iceberg.30.0.dsdl")) - except FileNotFoundError: + ((wrkspc.directory / "zubax/colliding/300.Iceberg.30.0.dsdl")).unlink() + except FileNotFoundError: # pragma: no cover pass # We're running on a platform where paths are not case-sensitive. - _define( + wrkspc.new( "zubax/noncolliding/iceberg/Ice.1.0.dsdl", dedent( """ @@ -1140,7 +1143,7 @@ def _define(rel_path: str, text: str) -> None: """ ), ) - _define( + wrkspc.new( "zubax/noncolliding/Iceb.1.0.dsdl", dedent( """ @@ -1150,29 +1153,15 @@ def _define(rel_path: str, text: str) -> None: """ ), ) - parsed = _namespace.read_namespace(Path(directory.name, "zubax"), Path(directory.name, "zubax")) + parsed = _namespace.read_namespace(wrkspc.directory / "zubax", wrkspc.directory / "zubax") assert "zubax.noncolliding.iceberg.Ice" in [x.full_name for x in parsed] assert "zubax.noncolliding.Iceb" in [x.full_name for x in parsed] -def _unittest_parse_namespace_versioning() -> None: +def _unittest_parse_namespace_versioning(wrkspc: Workspace) -> None: from pytest import raises - import glob - - directory = tempfile.TemporaryDirectory() - - # noinspection PyShadowingNames - def _define(rel_path: str, text: str) -> None: - path = Path(directory.name, rel_path) - os.makedirs(os.path.dirname(path), exist_ok=True) - with open(path, "w") as f: - f.write(text) - def _undefine_glob(rel_path_glob: str) -> None: - for g in glob.glob(str(Path(directory.name, rel_path_glob))): - os.remove(g) - - _define( + wrkspc.new( "ns/Spartans.30.0.dsdl", dedent( """ @@ -1188,7 +1177,7 @@ def _undefine_glob(rel_path_glob: str) -> None: ), ) - _define( + wrkspc.new( "ns/Spartans.30.1.dsdl", dedent( """ @@ -1204,11 +1193,11 @@ def _undefine_glob(rel_path_glob: str) -> None: ), ) - parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) + parsed = _namespace.read_namespace((wrkspc.directory / "ns"), []) print(parsed) assert len(parsed) == 2 - _define( + wrkspc.new( "ns/Spartans.30.2.dsdl", dedent( """ @@ -1223,11 +1212,11 @@ def _undefine_glob(rel_path_glob: str) -> None: ) with raises(_namespace.VersionsOfDifferentKindError): - _namespace.read_namespace(Path(directory.name, "ns"), []) + _namespace.read_namespace((wrkspc.directory / "ns"), []) - _undefine_glob("ns/Spartans.30.[01].dsdl") + wrkspc.drop("ns/Spartans.30.[01].dsdl") - _define( + wrkspc.new( "ns/Spartans.30.0.dsdl", dedent( """ @@ -1241,11 +1230,11 @@ def _undefine_glob(rel_path_glob: str) -> None: ), ) - parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) + parsed = _namespace.read_namespace((wrkspc.directory / "ns"), []) print(parsed) assert len(parsed) == 2 - _define( + wrkspc.new( "ns/Spartans.30.1.dsdl", dedent( """ @@ -1259,7 +1248,7 @@ def _undefine_glob(rel_path_glob: str) -> None: ), ) - _define( + wrkspc.new( "ns/6700.Spartans.30.2.dsdl", dedent( """ @@ -1274,15 +1263,15 @@ def _undefine_glob(rel_path_glob: str) -> None: ) with raises(_namespace.MultipleDefinitionsUnderSameVersionError): - _namespace.read_namespace(Path(directory.name, "ns"), []) + _namespace.read_namespace((wrkspc.directory / "ns"), []) - _undefine_glob("ns/Spartans.30.2.dsdl") + wrkspc.drop("ns/Spartans.30.2.dsdl") - parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) + parsed = _namespace.read_namespace((wrkspc.directory / "ns"), []) assert len(parsed) == 3 - _undefine_glob("ns/Spartans.30.0.dsdl") - _define( + wrkspc.drop("ns/Spartans.30.0.dsdl") + wrkspc.new( "ns/6700.Spartans.30.0.dsdl", dedent( """ @@ -1297,10 +1286,10 @@ def _undefine_glob(rel_path_glob: str) -> None: ) with raises(_namespace.MinorVersionFixedPortIDError): - _namespace.read_namespace(Path(directory.name, "ns"), []) + _namespace.read_namespace((wrkspc.directory / "ns"), []) - _undefine_glob("ns/Spartans.30.1.dsdl") - _define( + wrkspc.drop("ns/Spartans.30.1.dsdl") + wrkspc.new( "ns/6700.Spartans.30.1.dsdl", dedent( """ @@ -1314,11 +1303,11 @@ def _undefine_glob(rel_path_glob: str) -> None: ), ) - parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) + parsed = _namespace.read_namespace((wrkspc.directory / "ns"), []) assert len(parsed) == 3 - _undefine_glob("ns/6700.Spartans.30.1.dsdl") - _define( + wrkspc.drop("ns/6700.Spartans.30.1.dsdl") + wrkspc.new( "ns/6701.Spartans.30.1.dsdl", dedent( """ @@ -1333,11 +1322,11 @@ def _undefine_glob(rel_path_glob: str) -> None: ) with raises(_namespace.MinorVersionFixedPortIDError): - _namespace.read_namespace(Path(directory.name, "ns"), []) + _namespace.read_namespace((wrkspc.directory / "ns"), []) # Adding new major version under the same FPID - _undefine_glob("ns/6701.Spartans.30.1.dsdl") - _define( + wrkspc.drop("ns/6701.Spartans.30.1.dsdl") + wrkspc.new( "ns/6700.Spartans.31.0.dsdl", dedent( """ @@ -1352,11 +1341,11 @@ def _undefine_glob(rel_path_glob: str) -> None: ) with raises(_namespace.FixedPortIDCollisionError): - _namespace.read_namespace(Path(directory.name, "ns"), []) + _namespace.read_namespace((wrkspc.directory / "ns"), []) # Major version zero allows us to re-use the same FPID under a different (non-zero) major version - _undefine_glob("ns/6700.Spartans.31.0.dsdl") - _define( + wrkspc.drop("ns/6700.Spartans.31.0.dsdl") + wrkspc.new( "ns/6700.Spartans.0.1.dsdl", dedent( """ @@ -1371,13 +1360,13 @@ def _undefine_glob(rel_path_glob: str) -> None: ) # These are needed to ensure full branch coverage, see the checking code. - _define("ns/Empty.1.0.dsdl", "@extent 0") - _define("ns/Empty.1.1.dsdl", "@extent 0") - _define("ns/Empty.2.0.dsdl", "@extent 0") - _define("ns/6800.Empty.3.0.dsdl", "@extent 0") - _define("ns/6801.Empty.4.0.dsdl", "@extent 0") + wrkspc.new("ns/Empty.1.0.dsdl", "@extent 0") + wrkspc.new("ns/Empty.1.1.dsdl", "@extent 0") + wrkspc.new("ns/Empty.2.0.dsdl", "@extent 0") + wrkspc.new("ns/6800.Empty.3.0.dsdl", "@extent 0") + wrkspc.new("ns/6801.Empty.4.0.dsdl", "@extent 0") - parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) # no error + parsed = _namespace.read_namespace((wrkspc.directory / "ns"), []) # no error assert len(parsed) == 8 # Check ordering - the definitions must be sorted properly by name (lexicographically) and version (newest first). @@ -1393,30 +1382,30 @@ def _undefine_glob(rel_path_glob: str) -> None: ] # Extent consistency -- non-service type - _define("ns/Consistency.1.0.dsdl", "uint8 a\n@extent 128") - _define("ns/Consistency.1.1.dsdl", "uint8 a\nuint8 b\n@extent 128") - parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) # no error + wrkspc.new("ns/Consistency.1.0.dsdl", "uint8 a\n@extent 128") + wrkspc.new("ns/Consistency.1.1.dsdl", "uint8 a\nuint8 b\n@extent 128") + parsed = _namespace.read_namespace((wrkspc.directory / "ns"), []) # no error assert len(parsed) == 10 - _define("ns/Consistency.1.2.dsdl", "uint8 a\nuint8 b\nuint8 c\n@extent 256") + wrkspc.new("ns/Consistency.1.2.dsdl", "uint8 a\nuint8 b\nuint8 c\n@extent 256") with raises( _namespace.ExtentConsistencyError, match=r"(?i).*extent of ns\.Consistency\.1\.2 is 256 bits.*" ) as ei_extent: - _namespace.read_namespace(Path(directory.name, "ns"), []) + _namespace.read_namespace((wrkspc.directory / "ns"), []) print(ei_extent.value) assert ei_extent.value.path and "Consistency.1" in str(ei_extent.value.path) - _undefine_glob("ns/Consistency*") + wrkspc.drop("ns/Consistency*") # Extent consistency -- non-service type, zero major version - _define("ns/Consistency.0.1.dsdl", "uint8 a\n@extent 128") - _define("ns/Consistency.0.2.dsdl", "uint8 a\nuint8 b\n@extent 128") - parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) # no error + wrkspc.new("ns/Consistency.0.1.dsdl", "uint8 a\n@extent 128") + wrkspc.new("ns/Consistency.0.2.dsdl", "uint8 a\nuint8 b\n@extent 128") + parsed = _namespace.read_namespace((wrkspc.directory / "ns"), []) # no error assert len(parsed) == 10 - _define("ns/Consistency.0.3.dsdl", "uint8 a\nuint8 b\nuint8 c\n@extent 256") # no error - _namespace.read_namespace(Path(directory.name, "ns"), []) - _undefine_glob("ns/Consistency*") + wrkspc.new("ns/Consistency.0.3.dsdl", "uint8 a\nuint8 b\nuint8 c\n@extent 256") # no error + _namespace.read_namespace((wrkspc.directory / "ns"), []) + wrkspc.drop("ns/Consistency*") # Extent consistency -- request - _define( + wrkspc.new( "ns/Consistency.1.0.dsdl", dedent( """ @@ -1428,7 +1417,7 @@ def _undefine_glob(rel_path_glob: str) -> None: """ ), ) - _define( + wrkspc.new( "ns/Consistency.1.1.dsdl", dedent( """ @@ -1441,9 +1430,9 @@ def _undefine_glob(rel_path_glob: str) -> None: """ ), ) - parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) # no error + parsed = _namespace.read_namespace((wrkspc.directory / "ns"), []) # no error assert len(parsed) == 10 - _define( + wrkspc.new( "ns/Consistency.1.2.dsdl", dedent( """ @@ -1459,13 +1448,13 @@ def _undefine_glob(rel_path_glob: str) -> None: with raises( _namespace.ExtentConsistencyError, match=r"(?i).*extent of ns\.Consistency.* is 256 bits.*" ) as ei_extent: - _namespace.read_namespace(Path(directory.name, "ns"), []) + _namespace.read_namespace((wrkspc.directory / "ns"), []) print(ei_extent.value) assert ei_extent.value.path and "Consistency.1" in str(ei_extent.value.path) - _undefine_glob("ns/Consistency*") + wrkspc.drop("ns/Consistency*") # Extent consistency -- response - _define( + wrkspc.new( "ns/Consistency.1.0.dsdl", dedent( """ @@ -1477,7 +1466,7 @@ def _undefine_glob(rel_path_glob: str) -> None: """ ), ) - _define( + wrkspc.new( "ns/Consistency.1.1.dsdl", dedent( """ @@ -1490,9 +1479,9 @@ def _undefine_glob(rel_path_glob: str) -> None: """ ), ) - parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) # no error + parsed = _namespace.read_namespace((wrkspc.directory / "ns"), []) # no error assert len(parsed) == 10 - _define( + wrkspc.new( "ns/Consistency.1.2.dsdl", dedent( """ @@ -1507,25 +1496,25 @@ def _undefine_glob(rel_path_glob: str) -> None: with raises( _namespace.ExtentConsistencyError, match=r"(?i).*extent of ns\.Consistency.* is 256 bits.*" ) as ei_extent: - _namespace.read_namespace(Path(directory.name, "ns"), []) + _namespace.read_namespace((wrkspc.directory / "ns"), []) print(ei_extent.value) assert ei_extent.value.path and "Consistency.1" in str(ei_extent.value.path) - _undefine_glob("ns/Consistency*") + wrkspc.drop("ns/Consistency*") # Sealing consistency -- non-service type - _define("ns/Consistency.1.0.dsdl", "uint64 a\n@extent 64") - _define("ns/Consistency.1.1.dsdl", "uint64 a\n@extent 64") - parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) # no error + wrkspc.new("ns/Consistency.1.0.dsdl", "uint64 a\n@extent 64") + wrkspc.new("ns/Consistency.1.1.dsdl", "uint64 a\n@extent 64") + parsed = _namespace.read_namespace((wrkspc.directory / "ns"), []) # no error assert len(parsed) == 10 - _define("ns/Consistency.1.2.dsdl", "uint64 a\n@sealed") + wrkspc.new("ns/Consistency.1.2.dsdl", "uint64 a\n@sealed") with raises(_namespace.SealingConsistencyError, match=r"(?i).*ns\.Consistency\.1\.2 is sealed.*") as ei_sealing: - _namespace.read_namespace(Path(directory.name, "ns"), []) + _namespace.read_namespace((wrkspc.directory / "ns"), []) print(ei_sealing.value) assert ei_sealing.value.path and "Consistency.1" in str(ei_sealing.value.path) - _undefine_glob("ns/Consistency*") + wrkspc.drop("ns/Consistency*") # Sealing consistency -- request - _define( + wrkspc.new( "ns/Consistency.1.0.dsdl", dedent( """ @@ -1537,7 +1526,7 @@ def _undefine_glob(rel_path_glob: str) -> None: """ ), ) - _define( + wrkspc.new( "ns/Consistency.1.1.dsdl", dedent( """ @@ -1549,9 +1538,9 @@ def _undefine_glob(rel_path_glob: str) -> None: """ ), ) - parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) # no error + parsed = _namespace.read_namespace((wrkspc.directory / "ns"), []) # no error assert len(parsed) == 10 - _define( + wrkspc.new( "ns/Consistency.1.2.dsdl", dedent( """ @@ -1564,13 +1553,13 @@ def _undefine_glob(rel_path_glob: str) -> None: ), ) with raises(_namespace.SealingConsistencyError, match=r"(?i).*ns\.Consistency.* is sealed.*") as ei_sealing: - _namespace.read_namespace(Path(directory.name, "ns"), []) + _namespace.read_namespace((wrkspc.directory / "ns"), []) print(ei_sealing.value) assert ei_sealing.value.path and "Consistency.1" in str(ei_sealing.value.path) - _undefine_glob("ns/Consistency*") + wrkspc.drop("ns/Consistency*") # Sealing consistency -- response - _define( + wrkspc.new( "ns/Consistency.1.0.dsdl", dedent( """ @@ -1582,7 +1571,7 @@ def _undefine_glob(rel_path_glob: str) -> None: """ ), ) - _define( + wrkspc.new( "ns/Consistency.1.1.dsdl", dedent( """ @@ -1594,9 +1583,9 @@ def _undefine_glob(rel_path_glob: str) -> None: """ ), ) - parsed = _namespace.read_namespace(Path(directory.name, "ns"), []) # no error + parsed = _namespace.read_namespace((wrkspc.directory / "ns"), []) # no error assert len(parsed) == 10 - _define( + wrkspc.new( "ns/Consistency.1.2.dsdl", dedent( """ @@ -1609,41 +1598,50 @@ def _undefine_glob(rel_path_glob: str) -> None: ), ) with raises(_namespace.SealingConsistencyError, match=r"(?i).*ns\.Consistency.* is sealed.*") as ei_sealing: - _namespace.read_namespace(Path(directory.name, "ns"), []) + _namespace.read_namespace((wrkspc.directory / "ns"), []) print(ei_sealing.value) assert ei_sealing.value.path and "Consistency.1" in str(ei_sealing.value.path) - _undefine_glob("ns/Consistency*") + wrkspc.drop("ns/Consistency*") def _unittest_parse_namespace_faults() -> None: from pytest import raises - with raises(_namespace.NestedRootNamespaceError): - _namespace.read_namespace( - "/foo/bar/baz", ["/bat/wot", "/foo/bar/baz/bad"], allow_root_namespace_name_collision=False - ) - - with raises(_namespace.RootNamespaceNameCollisionError): - _namespace.read_namespace( - "/foo/bar/baz", ["/foo/bar/zoo", "/foo/bar/doo/roo/BAZ"], allow_root_namespace_name_collision=False - ) # Notice the letter case - - with raises(_namespace.RootNamespaceNameCollisionError): - _namespace.read_namespace( - "/foo/bar/baz", - ["/foo/bar/zoo", "/foo/bar/doo/roo/zoo", "/foo/bar/doo/roo/baz"], - allow_root_namespace_name_collision=False, - ) + with tempfile.TemporaryDirectory() as tmp_dir: + di = Path(tmp_dir) + (di / "foo/bar/baz").mkdir(parents=True) + (di / "bat/wot").mkdir(parents=True) + (di / "foo/bar/baz/bad").mkdir(parents=True) + (di / "foo/bar/zoo").mkdir(parents=True) + (di / "foo/bar/doo/roo/BAZ").mkdir(parents=True) + (di / "foo/bar/doo/roo/zoo").mkdir(parents=True) + (di / "foo/bar/doo/roo/baz").mkdir(parents=True, exist_ok=True) + with raises(_namespace.NestedRootNamespaceError): + _namespace.read_namespace( + di / "foo/bar/baz", + [di / "bat/wot", di / "foo/bar/baz/bad"], + ) + with raises(_namespace.RootNamespaceNameCollisionError): + _namespace.read_namespace( + di / "foo/bar/baz", + [di / "foo/bar/zoo", di / "foo/bar/doo/roo/BAZ"], # Notice the letter case + allow_root_namespace_name_collision=False, + ) + with raises(_namespace.RootNamespaceNameCollisionError): + _namespace.read_namespace( + di / "foo/bar/baz", + [di / "foo/bar/zoo", di / "foo/bar/doo/roo/zoo", di / "foo/bar/doo/roo/baz"], + allow_root_namespace_name_collision=False, + ) -@_in_n_out -def _unittest_inconsistent_deprecation() -> None: +def _unittest_inconsistent_deprecation(wrkspc: Workspace) -> None: from pytest import raises - _parse_definition( - _define("ns/A.1.0.dsdl", "@sealed"), + parse_definition( + wrkspc.parse_new("ns/A.1.0.dsdl", "@sealed"), [ - _define( + wrkspc.parse_new( "ns/B.1.0.dsdl", dedent( """ @@ -1657,8 +1655,8 @@ def _unittest_inconsistent_deprecation() -> None: ) with raises(_error.InvalidDefinitionError, match="(?i).*depend.*deprecated.*"): - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/C.1.0.dsdl", dedent( """ @@ -1667,11 +1665,11 @@ def _unittest_inconsistent_deprecation() -> None: """ ), ), - [_define("ns/X.1.0.dsdl", "@deprecated\n@sealed")], + [wrkspc.parse_new("ns/X.1.0.dsdl", "@deprecated\n@sealed")], ) - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/D.1.0.dsdl", dedent( """ @@ -1681,16 +1679,15 @@ def _unittest_inconsistent_deprecation() -> None: """ ), ), - [_define("ns/X.1.0.dsdl", "@deprecated\n@sealed")], + [wrkspc.parse_new("ns/X.1.0.dsdl", "@deprecated\n@sealed")], ) -@_in_n_out -def _unittest_repeated_directives() -> None: +def _unittest_repeated_directives(wrkspc: Workspace) -> None: from pytest import raises - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/A.1.0.dsdl", dedent( """ @@ -1706,8 +1703,8 @@ def _unittest_repeated_directives() -> None: ) with raises(_error.InvalidDefinitionError, match="(?i).*deprecated.*"): - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/A.1.0.dsdl", dedent( """ @@ -1721,8 +1718,8 @@ def _unittest_repeated_directives() -> None: ) with raises(_error.InvalidDefinitionError, match="(?i).*deprecated.*"): - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/A.1.0.dsdl", dedent( """ @@ -1737,8 +1734,8 @@ def _unittest_repeated_directives() -> None: [], ) - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/A.1.0.dsdl", dedent( """ @@ -1758,8 +1755,8 @@ def _unittest_repeated_directives() -> None: ) with raises(_error.InvalidDefinitionError, match="(?i).*union.*"): - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/A.1.0.dsdl", dedent( """ @@ -1775,8 +1772,8 @@ def _unittest_repeated_directives() -> None: ) with raises(_error.InvalidDefinitionError, match="(?i).*sealed.*"): - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/A.1.0.dsdl", dedent( """ @@ -1792,8 +1789,8 @@ def _unittest_repeated_directives() -> None: ) with raises(_error.InvalidDefinitionError, match="(?i).*extent.*already set.*"): - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/A.1.0.dsdl", dedent( """ @@ -1809,13 +1806,12 @@ def _unittest_repeated_directives() -> None: ) -@_in_n_out -def _unittest_dsdl_parser_basics() -> None: +def _unittest_dsdl_parser_basics(wrkspc: Workspace) -> None: # This is how you can run one test only for development needs: # pytest pydsdl -k _unittest_dsdl_parser_basics --capture=no # noinspection SpellCheckingInspection - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/A.1.0.dsdl", dedent( r""" @@ -1838,19 +1834,18 @@ def _unittest_dsdl_parser_basics() -> None: ), ), [ - _define("ns/Foo.1.0.dsdl", "int8 THE_CONSTANT = 42\n@extent 1024"), - _define("ns/Bar.1.23.dsdl", "int8 the_field\nint8 A = 0xA\nint8 B = 0xB\n@extent 1024"), + wrkspc.parse_new("ns/Foo.1.0.dsdl", "int8 THE_CONSTANT = 42\n@extent 1024"), + wrkspc.parse_new("ns/Bar.1.23.dsdl", "int8 the_field\nint8 A = 0xA\nint8 B = 0xB\n@extent 1024"), ], ) -@_in_n_out -def _unittest_dsdl_parser_expressions() -> None: +def _unittest_dsdl_parser_expressions(wrkspc: Workspace) -> None: from pytest import raises - def throws(definition: str, exc: typing.Type[Exception] = _expression.InvalidOperandError) -> None: + def throws(definition: str, exc: Type[Exception] = _expression.InvalidOperandError) -> None: with raises(exc): - _parse_definition(_define("ns/Throws.0.1.dsdl", dedent(definition + "\n@sealed")), []) + parse_definition(wrkspc.parse_new("ns/Throws.0.1.dsdl", dedent(definition + "\n@sealed")), []) throws("bool R = true && 0") throws("bool R = true || 0") @@ -1885,8 +1880,8 @@ def throws(definition: str, exc: typing.Type[Exception] = _expression.InvalidOpe throws('bool R = true % "0"') throws('bool R = true ** "0"') - _parse_definition( - _define( + parse_definition( + wrkspc.parse_new( "ns/A.1.0.dsdl", dedent( r""" @@ -1942,12 +1937,11 @@ def throws(definition: str, exc: typing.Type[Exception] = _expression.InvalidOpe ) -@_in_n_out -def _unittest_pickle() -> None: +def _unittest_pickle(wrkspc: Workspace) -> None: import pickle - p = _parse_definition( - _define( + p = parse_definition( + wrkspc.parse_new( "ns/A.1.0.dsdl", dedent( r""" @@ -1976,7 +1970,7 @@ def _unittest_pickle() -> None: assert repr(pp) == repr(p) -def _collect_descendants(cls: typing.Type[object]) -> typing.Iterable[typing.Type[object]]: +def _collect_descendants(cls: Type[object]) -> Iterable[Type[object]]: # noinspection PyArgumentList for t in cls.__subclasses__(): yield t diff --git a/setup.cfg b/setup.cfg index 16ea5f5..1dc3a9d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -60,6 +60,7 @@ show_error_context = True strict_equality = True implicit_reexport = False incremental = False +exclude = pydsdl/third_party [mypy-pytest.*] ignore_missing_imports = True diff --git a/setup.py b/setup.py index b7965c7..b56fbe9 100755 --- a/setup.py +++ b/setup.py @@ -4,14 +4,6 @@ # Author: Pavel Kirienko # type: ignore -import sys import setuptools -if int(setuptools.__version__.split(".")[0]) < 30: - print( - "A newer version of setuptools is required. The current version does not support declarative config.", - file=sys.stderr, - ) - sys.exit(1) - setuptools.setup()