diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..fec6c63 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,24 @@ +{ + "name": "Python dev environment", + "image": "ghcr.io/opencyphal/toxic:tx22.4.2", + "workspaceFolder": "/workspace", + "workspaceMount": "source=${localWorkspaceFolder},target=/workspace,type=bind,consistency=delegated", + "mounts": [ + "source=root-vscode-server,target=/root/.vscode-server/extensions,type=volume", + "source=pydsdl-tox,target=/workspace/.nox,type=volume" + ], + "customizations": { + "vscode": { + "extensions": [ + "uavcan.dsdl", + "wholroyd.jinja", + "streetsidesoftware.code-spell-checker", + "ms-python.python", + "ms-python.mypy-type-checker", + "ms-python.black-formatter", + "ms-python.pylint" + ] + } + }, + "postCreateCommand": "git clone --depth 1 git@github.com:OpenCyphal/public_regulated_data_types.git .dsdl-test && nox -e test-3.12" +} diff --git a/.github/workflows/test-and-release.yml b/.github/workflows/test-and-release.yml index 9efebdd..705c4db 100644 --- a/.github/workflows/test-and-release.yml +++ b/.github/workflows/test-and-release.yml @@ -1,5 +1,5 @@ name: 'Test and Release PyDSDL' -on: push +on: [ push, pull_request ] # Ensures that only one workflow is running at a time concurrency: @@ -9,11 +9,14 @@ concurrency: jobs: pydsdl-test: name: Test PyDSDL + # Run on push OR on 3rd-party PR. + # https://docs.github.com/en/webhooks/webhook-events-and-payloads?actionType=edited#pull_request + if: (github.event_name == 'push') || github.event.pull_request.head.repo.fork strategy: fail-fast: false matrix: os: [ ubuntu-latest, macos-latest ] - python: [ '3.7', '3.8', '3.9', '3.10', '3.11' ] + python: [ '3.8', '3.9', '3.10', '3.11', '3.12' ] include: - os: windows-2019 python: '3.10' @@ -22,10 +25,10 @@ jobs: runs-on: ${{ matrix.os }} steps: - name: Check out pydsdl - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Check out public_regulated_data_types - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: repository: OpenCyphal/public_regulated_data_types path: .dsdl-test @@ -50,27 +53,28 @@ jobs: - name: Run build and test run: | if [ "$RUNNER_OS" == "Linux" ]; then - nox --non-interactive --error-on-missing-interpreters --session test test_eol pristine lint --python ${{ matrix.python }} + nox --non-interactive --error-on-missing-interpreters --session test pristine lint --python ${{ matrix.python }} nox --non-interactive --session docs elif [ "$RUNNER_OS" == "Windows" ]; then - nox --forcecolor --non-interactive --error-on-missing-interpreters --session test test_eol pristine lint + nox --forcecolor --non-interactive --error-on-missing-interpreters --session test pristine lint elif [ "$RUNNER_OS" == "macOS" ]; then - nox --non-interactive --error-on-missing-interpreters --session test test_eol pristine lint --python ${{ matrix.python }} + nox --non-interactive --error-on-missing-interpreters --session test pristine lint --python ${{ matrix.python }} else echo "${{ runner.os }} not supported" exit 1 fi - python -c "import pydsdl; pydsdl.read_namespace('.dsdl-test/uavcan', [])" shell: bash pydsdl-release: name: Release PyDSDL - if: contains(github.event.head_commit.message, '#release') || contains(github.ref, '/master') + if: > + (github.event_name == 'push') && + (contains(github.event.head_commit.message, '#release') || contains(github.ref, '/master')) needs: pydsdl-test runs-on: ubuntu-latest steps: - name: Check out - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Build distribution run: | diff --git a/.readthedocs.yml b/.readthedocs.yml index 97fb527..3cebaef 100644 --- a/.readthedocs.yml +++ b/.readthedocs.yml @@ -2,6 +2,13 @@ version: 2 +build: + os: ubuntu-22.04 + tools: + python: "3.12" + apt_packages: + - graphviz + sphinx: configuration: docs/conf.py fail_on_warning: true diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..fbc7eb1 --- /dev/null +++ b/conftest.py @@ -0,0 +1,63 @@ +# +# Copyright (C) OpenCyphal Development Team +# Copyright Amazon.com Inc. or its affiliates. +# SPDX-License-Identifier: MIT +# +""" +Configuration for pytest tests including fixtures and hooks. +""" + +import tempfile +from pathlib import Path +from typing import Any, Optional + +import pytest + + +# +-------------------------------------------------------------------------------------------------------------------+ +# | TEST FIXTURES +# +-------------------------------------------------------------------------------------------------------------------+ +class TemporaryDsdlContext: + """ + Powers the temp_dsdl_factory test fixture. + """ + def __init__(self) -> None: + self._base_dir: Optional[Any] = None + + def new_file(self, file_path: Path, text: Optional[str] = None) -> Path: + if file_path.is_absolute(): + raise ValueError(f"{file_path} is an absolute path. The test fixture requires relative paths to work.") + file = self.base_dir / file_path + file.parent.mkdir(parents=True, exist_ok=True) + if text is not None: + file.write_text(text) + return file + + @property + def base_dir(self) -> Path: + if self._base_dir is None: + self._base_dir = tempfile.TemporaryDirectory() + return Path(self._base_dir.name).resolve() + + def _test_path_finalizer(self) -> None: + """ + Finalizer to clean up any temporary directories created during the test. + """ + if self._base_dir is not None: + self._base_dir.cleanup() + del self._base_dir + self._base_dir = None + +@pytest.fixture(scope="function") +def temp_dsdl_factory(request: pytest.FixtureRequest) -> Any: # pylint: disable=unused-argument + """ + Fixture for pydsdl tests that have to create files as part of the test. This object stays in-scope for a given + test method and does not requires a context manager in the test itself. + + Call `new_file(path)` to create a new file path in the fixture's temporary directory. This will create all + uncreated parent directories but will _not_ create the file unless text is provided: `new_file(path, "hello")` + """ + f = TemporaryDsdlContext() + request.addfinalizer(f._test_path_finalizer) # pylint: disable=protected-access + return f + diff --git a/docs/index.rst b/docs/index.rst index 0d15657..ff3bf19 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -15,6 +15,7 @@ Contents -------- .. toctree:: + :maxdepth: 2 pages/installation pages/pydsdl diff --git a/docs/pages/pydsdl.rst b/docs/pages/pydsdl.rst index 4e73ac0..9a9a7c4 100644 --- a/docs/pages/pydsdl.rst +++ b/docs/pages/pydsdl.rst @@ -12,10 +12,11 @@ You can find a practical usage example in the Nunavut code generation library th :local: -The main function -+++++++++++++++++ +The main functions +++++++++++++++++++ .. autofunction:: pydsdl.read_namespace +.. autofunction:: pydsdl.read_files Type model diff --git a/docs/requirements.txt b/docs/requirements.txt index 9dc2a91..81ca5c8 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,3 +1,3 @@ -sphinx == 4.4.0 -sphinx_rtd_theme == 1.0.0 +sphinx == 7.1.2 # this is the last version that supports Python 3.8 +sphinx_rtd_theme == 2.0.0 sphinx-computron >= 0.2, < 2.0 diff --git a/noxfile.py b/noxfile.py index 6939d8e..52b9780 100644 --- a/noxfile.py +++ b/noxfile.py @@ -10,7 +10,7 @@ import nox -PYTHONS = ["3.8", "3.9", "3.10", "3.11"] +PYTHONS = ["3.8", "3.9", "3.10", "3.11", "3.12"] """The newest supported Python shall be listed LAST.""" nox.options.error_on_external_run = True @@ -33,7 +33,6 @@ def clean(session): "*.log", "*.tmp", ".nox", - ".dsdl-test", ] for w in wildcards: for f in Path.cwd().glob(w): @@ -49,9 +48,9 @@ def test(session): session.log("Using the newest supported Python: %s", is_latest_python(session)) session.install("-e", ".") session.install( - "pytest ~= 7.3", - "pytest-randomly ~= 3.12", - "coverage ~= 7.2", + "pytest ~= 8.1", + "pytest-randomly ~= 3.15", + "coverage ~= 7.5", ) session.run("coverage", "run", "-m", "pytest") session.run("coverage", "report", "--fail-under=95") @@ -61,14 +60,6 @@ def test(session): session.log(f"OPEN IN WEB BROWSER: file://{report_file}") -@nox.session(python=["3.7"]) -def test_eol(session): - """This is a minimal test session for those old Pythons that have EOLed.""" - session.install("-e", ".") - session.install("pytest") - session.run("pytest") - - @nox.session(python=PYTHONS) def pristine(session): """ @@ -85,8 +76,9 @@ def pristine(session): def lint(session): session.log("Using the newest supported Python: %s", is_latest_python(session)) session.install( - "mypy ~= 1.2.0", - "pylint ~= 2.17.2", + "mypy ~= 1.10", + "types-parsimonious", + "pylint ~= 3.2", ) session.run( "mypy", @@ -105,7 +97,8 @@ def lint(session): }, ) if is_latest_python(session): - session.install("black ~= 23.3") + # we run black only on the newest Python version to ensure that the code is formatted with the latest version + session.install("black ~= 24.4") session.run("black", "--check", ".") diff --git a/pydsdl/__init__.py b/pydsdl/__init__.py index d07f1f2..aea7d00 100644 --- a/pydsdl/__init__.py +++ b/pydsdl/__init__.py @@ -7,7 +7,7 @@ import sys as _sys from pathlib import Path as _Path -__version__ = "1.20.1" +__version__ = "1.21.0" __version_info__ = tuple(map(int, __version__.split(".")[:3])) __license__ = "MIT" __author__ = "OpenCyphal" @@ -25,8 +25,9 @@ _sys.path = [str(_Path(__file__).parent / "third_party")] + _sys.path # Never import anything that is not available here - API stability guarantees are only provided for the exposed items. +from ._dsdl import PrintOutputHandler as PrintOutputHandler from ._namespace import read_namespace as read_namespace -from ._namespace import PrintOutputHandler as PrintOutputHandler +from ._namespace import read_files as read_files # Error model. from ._error import FrontendError as FrontendError diff --git a/pydsdl/_bit_length_set/_symbolic_test.py b/pydsdl/_bit_length_set/_symbolic_test.py index db262fa..1c414d7 100644 --- a/pydsdl/_bit_length_set/_symbolic_test.py +++ b/pydsdl/_bit_length_set/_symbolic_test.py @@ -2,7 +2,6 @@ # This software is distributed under the terms of the MIT License. # Author: Pavel Kirienko -import typing import random import itertools from ._symbolic import NullaryOperator, validate_numerically @@ -140,7 +139,7 @@ def _unittest_repetition() -> None: ) assert op.min == 7 * 3 assert op.max == 17 * 3 - assert set(op.expand()) == set(map(sum, itertools.combinations_with_replacement([7, 11, 17], 3))) # type: ignore + assert set(op.expand()) == set(map(sum, itertools.combinations_with_replacement([7, 11, 17], 3))) assert set(op.expand()) == {21, 25, 29, 31, 33, 35, 39, 41, 45, 51} assert set(op.modulo(7)) == {0, 1, 2, 3, 4, 5, 6} assert set(op.modulo(8)) == {1, 3, 5, 7} @@ -149,7 +148,7 @@ def _unittest_repetition() -> None: for _ in range(1): child = NullaryOperator(random.randint(0, 100) for _ in range(random.randint(1, 10))) k = random.randint(0, 10) - ref = set(map(sum, itertools.combinations_with_replacement(child.expand(), k))) # type: ignore + ref = set(map(sum, itertools.combinations_with_replacement(child.expand(), k))) op = RepetitionOperator(child, k) assert set(op.expand()) == ref @@ -157,7 +156,7 @@ def _unittest_repetition() -> None: assert op.max == max(child.expand()) * k div = random.randint(1, 64) - assert set(op.modulo(div)) == {typing.cast(int, x) % div for x in ref} + assert set(op.modulo(div)) == {x % div for x in ref} validate_numerically(op) @@ -173,9 +172,9 @@ def _unittest_range_repetition() -> None: assert op.max == 17 * 3 assert set(op.expand()) == ( {0} - | set(map(sum, itertools.combinations_with_replacement([7, 11, 17], 1))) # type: ignore - | set(map(sum, itertools.combinations_with_replacement([7, 11, 17], 2))) # type: ignore - | set(map(sum, itertools.combinations_with_replacement([7, 11, 17], 3))) # type: ignore + | set(map(sum, itertools.combinations_with_replacement([7, 11, 17], 1))) + | set(map(sum, itertools.combinations_with_replacement([7, 11, 17], 2))) + | set(map(sum, itertools.combinations_with_replacement([7, 11, 17], 3))) ) assert set(op.expand()) == {0, 7, 11, 14, 17, 18, 21, 22, 24, 25, 28, 29, 31, 33, 34, 35, 39, 41, 45, 51} assert set(op.modulo(7)) == {0, 1, 2, 3, 4, 5, 6} @@ -197,10 +196,7 @@ def _unittest_range_repetition() -> None: k_max = random.randint(0, 10) ref = set( itertools.chain( - *( - map(sum, itertools.combinations_with_replacement(child.expand(), k)) # type: ignore - for k in range(k_max + 1) - ) + *(map(sum, itertools.combinations_with_replacement(child.expand(), k)) for k in range(k_max + 1)) ) ) op = RangeRepetitionOperator(child, k_max) @@ -210,7 +206,7 @@ def _unittest_range_repetition() -> None: assert op.max == max(child.expand()) * k_max div = random.randint(1, 64) - assert set(op.modulo(div)) == {typing.cast(int, x) % div for x in ref} + assert set(op.modulo(div)) == {x % div for x in ref} validate_numerically(op) diff --git a/pydsdl/_data_type_builder.py b/pydsdl/_data_type_builder.py index 4572da3..96128d0 100644 --- a/pydsdl/_data_type_builder.py +++ b/pydsdl/_data_type_builder.py @@ -2,16 +2,13 @@ # This software is distributed under the terms of the MIT License. # Author: Pavel Kirienko -from typing import Optional, Callable, Iterable +from __future__ import annotations import logging from pathlib import Path -from . import _serializable -from . import _expression -from . import _error -from . import _dsdl_definition -from . import _parser -from . import _data_schema_builder -from . import _port_id_ranges +from typing import Callable, Iterable + +from . import _data_schema_builder, _error, _expression, _parser, _port_id_ranges, _serializable +from ._dsdl import DefinitionVisitor, ReadableDSDLFile class AssertionCheckFailureError(_error.InvalidDefinitionError): @@ -42,21 +39,25 @@ class MissingSerializationModeError(_error.InvalidDefinitionError): class DataTypeBuilder(_parser.StatementStreamProcessor): + + # pylint: disable=too-many-arguments def __init__( self, - definition: _dsdl_definition.DSDLDefinition, - lookup_definitions: Iterable[_dsdl_definition.DSDLDefinition], + definition: ReadableDSDLFile, + lookup_definitions: Iterable[ReadableDSDLFile], + definition_visitors: Iterable[DefinitionVisitor], print_output_handler: Callable[[int, str], None], allow_unregulated_fixed_port_id: bool, ): self._definition = definition self._lookup_definitions = list(lookup_definitions) + self._definition_visitors = definition_visitors self._print_output_handler = print_output_handler self._allow_unregulated_fixed_port_id = allow_unregulated_fixed_port_id - self._element_callback = None # type: Optional[Callable[[str], None]] + self._element_callback = None # type: Callable[[str], None] | None - assert isinstance(self._definition, _dsdl_definition.DSDLDefinition) - assert all(map(lambda x: isinstance(x, _dsdl_definition.DSDLDefinition), lookup_definitions)) + assert isinstance(self._definition, ReadableDSDLFile) + assert all(map(lambda x: isinstance(x, ReadableDSDLFile), lookup_definitions)) assert callable(self._print_output_handler) assert isinstance(self._allow_unregulated_fixed_port_id, bool) @@ -65,7 +66,7 @@ def __init__( def finalize(self) -> _serializable.CompositeType: if len(self._structs) == 1: # Structure type - (builder,) = self._structs # type: _data_schema_builder.DataSchemaBuilder, + (builder,) = self._structs out = self._make_composite( builder=builder, name=self._definition.full_name, @@ -148,7 +149,7 @@ def on_padding_field(self, padding_field_type: _serializable.VoidType) -> None: ) def on_directive( - self, line_number: int, directive_name: str, associated_expression_value: Optional[_expression.Any] + self, line_number: int, directive_name: str, associated_expression_value: _expression.Any | None ) -> None: try: handler = { @@ -221,15 +222,20 @@ def resolve_versioned_data_type(self, name: str, version: _serializable.Version) raise _error.InternalError("Conflicting definitions: %r" % found) target_definition = found[0] - assert isinstance(target_definition, _dsdl_definition.DSDLDefinition) + for visitor in self._definition_visitors: + visitor.on_definition(self._definition, target_definition) + + assert isinstance(target_definition, ReadableDSDLFile) assert target_definition.full_name == full_name assert target_definition.version == version # Recursion is cool. - return target_definition.read( + dt = target_definition.read( lookup_definitions=self._lookup_definitions, + definition_visitors=self._definition_visitors, print_output_handler=self._print_output_handler, allow_unregulated_fixed_port_id=self._allow_unregulated_fixed_port_id, ) + return dt def _queue_attribute(self, element_callback: Callable[[str], None]) -> None: self._flush_attribute("") @@ -247,7 +253,7 @@ def _on_attribute(self) -> None: "This is to prevent errors if the extent is dependent on the bit length set of the data schema." ) - def _on_print_directive(self, line_number: int, value: Optional[_expression.Any]) -> None: + def _on_print_directive(self, line_number: int, value: _expression.Any | None) -> None: _logger.info( "Print directive at %s:%d%s", self._definition.file_path, @@ -256,7 +262,7 @@ def _on_print_directive(self, line_number: int, value: Optional[_expression.Any] ) self._print_output_handler(line_number, str(value if value is not None else "")) - def _on_assert_directive(self, line_number: int, value: Optional[_expression.Any]) -> None: + def _on_assert_directive(self, line_number: int, value: _expression.Any | None) -> None: if isinstance(value, _expression.Boolean): if not value.native_value: raise AssertionCheckFailureError( @@ -268,7 +274,7 @@ def _on_assert_directive(self, line_number: int, value: Optional[_expression.Any else: raise InvalidDirectiveError("The assertion check expression must yield a boolean, not %s" % value.TYPE_NAME) - def _on_extent_directive(self, line_number: int, value: Optional[_expression.Any]) -> None: + def _on_extent_directive(self, line_number: int, value: _expression.Any | None) -> None: if self._structs[-1].serialization_mode is not None: raise InvalidDirectiveError( "Misplaced extent directive. The serialization mode is already set to %s" @@ -284,7 +290,7 @@ def _on_extent_directive(self, line_number: int, value: Optional[_expression.Any else: raise InvalidDirectiveError("The extent directive expects a rational, not %s" % value.TYPE_NAME) - def _on_sealed_directive(self, _ln: int, value: Optional[_expression.Any]) -> None: + def _on_sealed_directive(self, _ln: int, value: _expression.Any | None) -> None: if self._structs[-1].serialization_mode is not None: raise InvalidDirectiveError( "Misplaced sealing directive. The serialization mode is already set to %s" @@ -294,7 +300,7 @@ def _on_sealed_directive(self, _ln: int, value: Optional[_expression.Any]) -> No raise InvalidDirectiveError("The sealed directive does not expect an expression") self._structs[-1].set_serialization_mode(_data_schema_builder.SealedSerializationMode()) - def _on_union_directive(self, _ln: int, value: Optional[_expression.Any]) -> None: + def _on_union_directive(self, _ln: int, value: _expression.Any | None) -> None: if value is not None: raise InvalidDirectiveError("The union directive does not expect an expression") if self._structs[-1].union: @@ -303,7 +309,7 @@ def _on_union_directive(self, _ln: int, value: Optional[_expression.Any]) -> Non raise InvalidDirectiveError("The union directive must be placed before the first " "attribute definition") self._structs[-1].make_union() - def _on_deprecated_directive(self, _ln: int, value: Optional[_expression.Any]) -> None: + def _on_deprecated_directive(self, _ln: int, value: _expression.Any | None) -> None: if value is not None: raise InvalidDirectiveError("The deprecated directive does not expect an expression") if self._is_deprecated: @@ -322,7 +328,7 @@ def _make_composite( # pylint: disable=too-many-arguments name: str, version: _serializable.Version, deprecated: bool, - fixed_port_id: Optional[int], + fixed_port_id: int | None, source_file_path: Path, has_parent_service: bool, ) -> _serializable.CompositeType: diff --git a/pydsdl/_dsdl.py b/pydsdl/_dsdl.py new file mode 100644 index 0000000..9cc007a --- /dev/null +++ b/pydsdl/_dsdl.py @@ -0,0 +1,231 @@ +# Copyright (C) OpenCyphal Development Team +# Copyright Amazon.com Inc. or its affiliates. +# SPDX-License-Identifier: MIT + +from __future__ import annotations +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Any, Callable, Iterable, TypeVar, List, Tuple + +from ._serializable import CompositeType, Version + +PrintOutputHandler = Callable[[Path, int, str], None] +"""Invoked when the frontend encounters a print directive or needs to output a generic diagnostic.""" + + +class DSDLFile(ABC): + """ + Interface for DSDL files. This interface is used by the parser to abstract DSDL type details inferred from the + filesystem. Where properties are duplicated between the composite type and this file the composite type is to be + considered canonical. The properties directly on this class are inferred from the dsdl file path before the + composite type has been parsed. + """ + + @property + @abstractmethod + def composite_type(self) -> CompositeType | None: + """The composite type that was read from the DSDL file or None if the type has not been parsed yet.""" + raise NotImplementedError() + + @property + @abstractmethod + def full_name(self) -> str: + """The full name, e.g., uavcan.node.Heartbeat""" + raise NotImplementedError() + + @property + def name_components(self) -> List[str]: + """Components of the full name as a list, e.g., ['uavcan', 'node', 'Heartbeat']""" + raise NotImplementedError() + + @property + @abstractmethod + def short_name(self) -> str: + """The last component of the full name, e.g., Heartbeat of uavcan.node.Heartbeat""" + raise NotImplementedError() + + @property + @abstractmethod + def full_namespace(self) -> str: + """The full name without the short name, e.g., uavcan.node for uavcan.node.Heartbeat""" + raise NotImplementedError() + + @property + @abstractmethod + def root_namespace(self) -> str: + """The first component of the full name, e.g., uavcan of uavcan.node.Heartbeat""" + raise NotImplementedError() + + @property + @abstractmethod + def text(self) -> str: + """The source text in its raw unprocessed form (with comments, formatting intact, and everything)""" + raise NotImplementedError() + + @property + @abstractmethod + def version(self) -> Version: + """ + The version of the DSDL definition. + """ + raise NotImplementedError() + + @property + @abstractmethod + def fixed_port_id(self) -> int | None: + """Either the fixed port ID as integer, or None if not defined for this type.""" + raise NotImplementedError() + + @property + @abstractmethod + def has_fixed_port_id(self) -> bool: + """ + If the type has a fixed port ID defined, this method returns True. Equivalent to ``fixed_port_id is not None``. + """ + raise NotImplementedError() + + @property + @abstractmethod + def file_path(self) -> Path: + """The path to the DSDL file on the filesystem.""" + raise NotImplementedError() + + @property + @abstractmethod + def root_namespace_path(self) -> Path: + """ + The path to the root namespace directory on the filesystem. + """ + raise NotImplementedError() + + +class ReadableDSDLFile(DSDLFile): + """ + A DSDL file that can construct a composite type from its contents. + """ + + @abstractmethod + def read( + self, + lookup_definitions: Iterable["ReadableDSDLFile"], + definition_visitors: Iterable["DefinitionVisitor"], + print_output_handler: Callable[[int, str], None], + allow_unregulated_fixed_port_id: bool, + ) -> CompositeType: + """ + Reads the data type definition and returns its high-level data type representation. + The output should be cached; all following invocations should read from this cache. + Caching is very important, because it is expected that the same definition may be referred to multiple + times (e.g., for composition or when accessing external constants). Re-processing a definition every time + it is accessed would be a huge waste of time. + Note, however, that this may lead to unexpected complications if one is attempting to re-read a definition + with different inputs (e.g., different lookup paths) expecting to get a different result: caching would + get in the way. That issue is easy to avoid by creating a new instance of the object. + :param lookup_definitions: List of definitions available for referring to. + :param definition_visitors: Visitors to notify about discovered dependencies. + :param print_output_handler: Used for @print and for diagnostics: (line_number, text) -> None. + :param allow_unregulated_fixed_port_id: Do not complain about fixed unregulated port IDs. + :return: The data type representation. + """ + raise NotImplementedError() + + +class DefinitionVisitor(ABC): + """ + A visitor that is notified about discovered dependencies. + """ + + @abstractmethod + def on_definition(self, target_dsdl_file: DSDLFile, dependency_dsdl_file: ReadableDSDLFile) -> None: + """ + Called by the parser after if finds a dependent type but before it parses a file in a lookup namespace. + :param target_dsdl_file: The target DSDL file that has dependencies the parser is searching for. + :param dependency_dsdl_file: The dependency of target_dsdl_file file the parser is about to parse. + """ + raise NotImplementedError() + + +SortedFileT = TypeVar("SortedFileT", DSDLFile, ReadableDSDLFile, CompositeType) +SortedFileList = List[SortedFileT] +"""A list of DSDL files sorted by name, newest version first.""" + + +def get_definition_ordering_rank(d: DSDLFile | CompositeType) -> Tuple[str, int, int]: + return d.full_name, -d.version.major, -d.version.minor + + +def file_sort(file_list: Iterable[SortedFileT]) -> SortedFileList[SortedFileT]: + """ + Sorts a list of DSDL files lexicographically by name, newest version first. + """ + return list(sorted(file_list, key=get_definition_ordering_rank)) + + +def normalize_paths_argument_to_list(namespaces_or_namespace: None | Path | str | Iterable[Path | str]) -> List[Path]: + """ + Normalizes the input argument to a list of paths. + """ + if namespaces_or_namespace is None: + return [] + if isinstance(namespaces_or_namespace, (Path, str)): + return [Path(namespaces_or_namespace)] + + def _convert(arg: Any) -> Path: + if not isinstance(arg, (str, Path)): + raise TypeError(f"Invalid type: {type(arg)}") + return Path(arg) if isinstance(arg, str) else arg + + value_set = set() + + def _filter_duplicate_paths(arg: Any) -> bool: + if arg in value_set: + return False + value_set.add(arg) + return True + + converted = [_convert(arg) for arg in namespaces_or_namespace] + return list(filter(_filter_duplicate_paths, converted)) + + +# +-[UNIT TESTS]------------------------------------------------------------------------------------------------------+ + + +def _unittest_dsdl_normalize_paths_argument_to_list() -> None: + + from pytest import raises as assert_raises + + # Test with None argument + result = normalize_paths_argument_to_list(None) + assert result == [] + + # Test with single string argument + result = normalize_paths_argument_to_list("path/to/namespace") + assert result == [Path("path/to/namespace")] + + # Test with single Path argument + result = normalize_paths_argument_to_list(Path("path/to/namespace")) + assert result == [Path("path/to/namespace")] + + # Test with list of strings argument + result = normalize_paths_argument_to_list(["path/to/namespace1", "path/to/namespace2"]) + assert result == [Path("path/to/namespace1"), Path("path/to/namespace2")] + + # Test with list of Path arguments + result = normalize_paths_argument_to_list([Path("path/to/namespace1"), Path("path/to/namespace2")]) + assert result == [Path("path/to/namespace1"), Path("path/to/namespace2")] + + # Test with mixed list of strings and Path arguments + result = normalize_paths_argument_to_list(["path/to/namespace1", Path("path/to/namespace2")]) + assert result == [Path("path/to/namespace1"), Path("path/to/namespace2")] + + # Test de-duplication + result = normalize_paths_argument_to_list(["path/to/namespace1", "path/to/namespace1"]) + assert result == [Path("path/to/namespace1")] + + # Test with invalid argument type + with assert_raises(TypeError): + normalize_paths_argument_to_list(42) # type: ignore + + # Test with invalid argument type + with assert_raises(TypeError): + normalize_paths_argument_to_list([42]) # type: ignore diff --git a/pydsdl/_dsdl_definition.py b/pydsdl/_dsdl_definition.py index a8114da..d1f15b7 100644 --- a/pydsdl/_dsdl_definition.py +++ b/pydsdl/_dsdl_definition.py @@ -2,14 +2,17 @@ # This software is distributed under the terms of the MIT License. # Author: Pavel Kirienko -import time -from typing import Iterable, Callable, Optional, List +from __future__ import annotations import logging +import time from pathlib import Path -from ._error import FrontendError, InvalidDefinitionError, InternalError -from ._serializable import CompositeType, Version -from . import _parser +from typing import Callable, Iterable, Type +from . import _parser +from ._data_type_builder import DataTypeBuilder, UndefinedDataTypeError +from ._dsdl import DefinitionVisitor, ReadableDSDLFile +from ._error import FrontendError, InternalError, InvalidDefinitionError +from ._serializable import CompositeType, Version _logger = logging.getLogger(__name__) @@ -23,32 +26,113 @@ def __init__(self, text: str, path: Path): super().__init__(text=text, path=Path(path)) -class DSDLDefinition: +class PathInferenceError(UndefinedDataTypeError): + """ + Raised when the namespace, type, fixed port ID, or version cannot be inferred from a file path. + """ + + def __init__(self, text: str, dsdl_path: Path, valid_dsdl_roots: list[Path]): + super().__init__(text=text, path=Path(dsdl_path)) + self.valid_dsdl_roots = valid_dsdl_roots[:] if valid_dsdl_roots is not None else None + + +class DSDLDefinition(ReadableDSDLFile): """ A DSDL type definition source abstracts the filesystem level details away, presenting a higher-level interface that operates solely on the level of type names, namespaces, fixed identifiers, and so on. Upper layers that operate on top of this abstraction do not concern themselves with the file system at all. + + :param file_path: The path to the DSDL file. + :param root_namespace_path: The path to the root namespace directory. `file_path` must be a descendant of this path. + See `from_first_in` for a more flexible way to create a DSDLDefinition object. """ + @classmethod + def _infer_path_to_root_from_first_found(cls, dsdl_path: Path, valid_dsdl_roots: list[Path]) -> Path: + """ + See `from_first_in` for documentation on this logic. + :return The path to the root namespace directory. + """ + if valid_dsdl_roots is None: + raise ValueError("valid_dsdl_roots was None") + + if dsdl_path.is_absolute() and len(valid_dsdl_roots) == 0: + raise PathInferenceError( + f"dsdl_path ({dsdl_path}) is absolute and the provided valid root names are empty. The DSDL root of " + "an absolute path cannot be inferred without this information.", + dsdl_path, + valid_dsdl_roots, + ) + + if len(valid_dsdl_roots) == 0: + # if we have no valid roots we can only infer the root of the path. We require the path to be relative + # to avoid accidental inferences given that dsdl file trees starting from a filesystem root are rare. + return Path(dsdl_path.parts[0]) + + # INFERENCE 1: The strongest inference is when the path is relative to a known root. + resolved_dsdl_path = dsdl_path.resolve(strict=False) if dsdl_path.is_absolute() else None + for path_to_root in valid_dsdl_roots: + # First we try the paths as-is... + try: + _ = dsdl_path.relative_to(path_to_root) + except ValueError: + pass + else: + return path_to_root + # then we try resolving the root path if it is absolute + if path_to_root.is_absolute() and resolved_dsdl_path is not None: + path_to_root_resolved = path_to_root.resolve(strict=False) + try: + _ = resolved_dsdl_path.relative_to(path_to_root_resolved).parent + except ValueError: + pass + else: + return path_to_root_resolved + + # INFERENCE 2: A weaker, but valid inference is when the path is a child of a known root folder name. + root_parts = [x.parts[-1] for x in valid_dsdl_roots if len(x.parts) == 1] + parts = list(dsdl_path.parent.parts) + for i, part in list(enumerate(parts)): + if part in root_parts: + return Path().joinpath(*parts[: i + 1]) + # +1 to include the root folder + raise PathInferenceError(f"No valid root found in path {str(dsdl_path)}", dsdl_path, valid_dsdl_roots) + + @classmethod + def from_first_in(cls: Type["DSDLDefinition"], dsdl_path: Path, valid_dsdl_roots: list[Path]) -> "DSDLDefinition": + """ + Creates a DSDLDefinition object by inferring the path to the namespace root of a DSDL file given a set + of valid roots. The logic used prefers an instance of dsdl_path found to exist under a valid root but + will degrade to pure-path string matching if no file is found. Because this logic uses the first root path + that passes one of these two inferences the order of the valid_dsdl_roots list matters. + + :param dsdl_path: The path to the alleged DSDL file. + :param valid_dsdl_roots: The ordered set of valid root names or paths under which the type must reside. + This argument is accepted as a list for ordering but no de-duplication is performed + as the caller is expected to provide a correct set of paths. + :return A new DSDLDefinition object + :raises PathInferenceError: If the namespace root cannot be inferred from the provided information. + """ + return cls(dsdl_path, cls._infer_path_to_root_from_first_found(dsdl_path, valid_dsdl_roots)) + def __init__(self, file_path: Path, root_namespace_path: Path): + """ """ # Normalizing the path and reading the definition text self._file_path = Path(file_path) del file_path self._root_namespace_path = Path(root_namespace_path) del root_namespace_path - with open(self._file_path) as f: - self._text = str(f.read()) + self._text: str | None = None # Checking the sanity of the root directory path - can't contain separators if CompositeType.NAME_COMPONENT_SEPARATOR in self._root_namespace_path.name: raise FileNameFormatError("Invalid namespace name", path=self._root_namespace_path) - # Determining the relative path within the root namespace directory relative_path = self._root_namespace_path.name / self._file_path.relative_to(self._root_namespace_path) # Parsing the basename, e.g., 434.GetTransportStatistics.0.1.dsdl basename_components = relative_path.name.split(".")[:-1] - str_fixed_port_id: Optional[str] = None + str_fixed_port_id: str | None = None if len(basename_components) == 4: str_fixed_port_id, short_name, str_major_version, str_minor_version = basename_components elif len(basename_components) == 3: @@ -59,7 +143,7 @@ def __init__(self, file_path: Path, root_namespace_path: Path): # Parsing the fixed port ID, if specified; None if not if str_fixed_port_id is not None: try: - self._fixed_port_id: Optional[int] = int(str_fixed_port_id) + self._fixed_port_id: int | None = int(str_fixed_port_id) except ValueError: raise FileNameFormatError( "Not a valid fixed port-ID: %s. " @@ -84,33 +168,26 @@ def __init__(self, file_path: Path, root_namespace_path: Path): raise FileNameFormatError(f"Invalid name for namespace component: {nc!r}", path=self._file_path) self._name: str = CompositeType.NAME_COMPONENT_SEPARATOR.join(namespace_components + [str(short_name)]) - self._cached_type: Optional[CompositeType] = None + self._cached_type: CompositeType | None = None + # +-----------------------------------------------------------------------+ + # | ReadableDSDLFile :: INTERFACE | + # +-----------------------------------------------------------------------+ def read( self, - lookup_definitions: Iterable["DSDLDefinition"], + lookup_definitions: Iterable[ReadableDSDLFile], + definition_visitors: Iterable[DefinitionVisitor], print_output_handler: Callable[[int, str], None], allow_unregulated_fixed_port_id: bool, ) -> CompositeType: - """ - Reads the data type definition and returns its high-level data type representation. - The output is cached; all following invocations will read from the cache. - Caching is very important, because it is expected that the same definition may be referred to multiple - times (e.g., for composition or when accessing external constants). Re-processing a definition every time - it is accessed would be a huge waste of time. - Note, however, that this may lead to unexpected complications if one is attempting to re-read a definition - with different inputs (e.g., different lookup paths) expecting to get a different result: caching would - get in the way. That issue is easy to avoid by creating a new instance of the object. - :param lookup_definitions: List of definitions available for referring to. - :param print_output_handler: Used for @print and for diagnostics: (line_number, text) -> None. - :param allow_unregulated_fixed_port_id: Do not complain about fixed unregulated port IDs. - :return: The data type representation. - """ log_prefix = "%s.%d.%d" % (self.full_name, self.version.major, self.version.minor) if self._cached_type is not None: _logger.debug("%s: Cache hit", log_prefix) return self._cached_type + if not self._file_path.exists(): + raise InvalidDefinitionError("Attempt to read DSDL file that doesn't exist.", self._file_path) + started_at = time.monotonic() # Remove the target definition from the lookup list in order to prevent @@ -124,17 +201,17 @@ def read( ", ".join(set(sorted(map(lambda x: x.root_namespace, lookup_definitions)))), ) try: - builder = _data_type_builder.DataTypeBuilder( + builder = DataTypeBuilder( definition=self, lookup_definitions=lookup_definitions, + definition_visitors=definition_visitors, print_output_handler=print_output_handler, allow_unregulated_fixed_port_id=allow_unregulated_fixed_port_id, ) - with open(self.file_path) as f: - _parser.parse(f.read(), builder) - self._cached_type = builder.finalize() + _parser.parse(self.text, builder) + self._cached_type = builder.finalize() _logger.info( "%s: Processed in %.0f ms; category: %s, fixed port ID: %s", log_prefix, @@ -151,34 +228,38 @@ def read( except Exception as ex: # pragma: no cover raise InternalError(culprit=ex, path=self.file_path) from ex + # +-----------------------------------------------------------------------+ + # | DSDLFile :: INTERFACE | + # +-----------------------------------------------------------------------+ + @property + def composite_type(self) -> CompositeType | None: + return self._cached_type + @property def full_name(self) -> str: - """The full name, e.g., uavcan.node.Heartbeat""" return self._name @property - def name_components(self) -> List[str]: - """Components of the full name as a list, e.g., ['uavcan', 'node', 'Heartbeat']""" + def name_components(self) -> list[str]: return self._name.split(CompositeType.NAME_COMPONENT_SEPARATOR) @property def short_name(self) -> str: - """The last component of the full name, e.g., Heartbeat of uavcan.node.Heartbeat""" return self.name_components[-1] @property def full_namespace(self) -> str: - """The full name without the short name, e.g., uavcan.node for uavcan.node.Heartbeat""" return str(CompositeType.NAME_COMPONENT_SEPARATOR.join(self.name_components[:-1])) @property def root_namespace(self) -> str: - """The first component of the full name, e.g., uavcan of uavcan.node.Heartbeat""" return self.name_components[0] @property def text(self) -> str: - """The source text in its raw unprocessed form (with comments, formatting intact, and everything)""" + if self._text is None: + with open(self._file_path) as f: + self._text = str(f.read()) return self._text @property @@ -186,8 +267,7 @@ def version(self) -> Version: return self._version @property - def fixed_port_id(self) -> Optional[int]: - """Either the fixed port ID as integer, or None if not defined for this type.""" + def fixed_port_id(self) -> int | None: return self._fixed_port_id @property @@ -202,6 +282,12 @@ def file_path(self) -> Path: def root_namespace_path(self) -> Path: return self._root_namespace_path + # +-----------------------------------------------------------------------+ + # | Python :: SPECIAL FUNCTIONS | + # +-----------------------------------------------------------------------+ + def __hash__(self) -> int: + return hash((self.full_name, self.version)) + def __eq__(self, other: object) -> bool: """ Two definitions will compare equal if they share the same name AND version number. @@ -222,6 +308,158 @@ def __str__(self) -> str: __repr__ = __str__ -# Moved this import here to break recursive dependency. -# Maybe I have messed up the architecture? Should think about it later. -from . import _data_type_builder # pylint: disable=wrong-import-position +# +-[UNIT TESTS]------------------------------------------------------------------------------------------------------+ + + +def _unittest_dsdl_definition_read_non_existent() -> None: + from pytest import raises as expect_raises + + target = Path("root", "ns", "Target.1.1.dsdl") + target_definition = DSDLDefinition(target, target.parent) + + def print_output(line_number: int, text: str) -> None: # pragma: no cover + _ = line_number, text + + with expect_raises(InvalidDefinitionError): + target_definition.read([], [], print_output, True) + + +def _unittest_dsdl_definition_read_text(temp_dsdl_factory) -> None: # type: ignore + from pytest import raises as expect_raises + + target_root = Path("root", "ns") + target_file_path = Path(target_root / "Target.1.1.dsdl") + dsdl_file = temp_dsdl_factory.new_file(target_root / target_file_path, "@sealed") + with expect_raises(ValueError): + _target_definition = DSDLDefinition(dsdl_file, target_root) + # we test first that we can't create the object until we have a target_root that contains the dsdl_file + + target_definition = DSDLDefinition(dsdl_file, dsdl_file.parent.parent) + assert "@sealed" == target_definition.text + + +def _unittest_type_from_path_inference() -> None: + from pytest import raises as expect_raises + + # pylint: disable=protected-access + + dsdl_file = Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl").resolve() + path_to_root = DSDLDefinition._infer_path_to_root_from_first_found(dsdl_file, [Path("/repo/uavcan").resolve()]) + namespace_parts = dsdl_file.parent.relative_to(path_to_root.parent).parts + + assert path_to_root == Path("/repo/uavcan").resolve() + assert namespace_parts == ("uavcan", "foo", "bar") + + # The simplest inference made is when relative dsdl paths are provided with no additional information. In this + # case the method assumes that the relative path is the correct and complete namespace of the type: + + # relative path + root = DSDLDefinition._infer_path_to_root_from_first_found(Path("uavcan/foo/bar/435.baz.1.0.dsdl"), []) + assert root == Path("uavcan") + + # The root namespace is not inferred in an absolute path without additional data: + + with expect_raises(PathInferenceError): + _ = DSDLDefinition._infer_path_to_root_from_first_found( + Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl").resolve(), [] + ) + + with expect_raises(ValueError): + _ = DSDLDefinition._infer_path_to_root_from_first_found( + Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl").resolve(), None # type: ignore + ) + + # If an absolute path is provided along with a path-to-root "hint" then the former must be relative to the + # latter: + + # dsdl file path is not contained within the root path + with expect_raises(PathInferenceError): + _ = DSDLDefinition._infer_path_to_root_from_first_found( + Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl").resolve(), [Path("/not-a-repo").resolve()] + ) + + root = DSDLDefinition._infer_path_to_root_from_first_found( + Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl").resolve(), [Path("/repo/uavcan").resolve()] + ) + assert root == Path("/repo/uavcan").resolve() + + # The priority is given to paths that are relative to the root when both simple root names and paths are provided: + root = DSDLDefinition._infer_path_to_root_from_first_found( + Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl").resolve(), [Path("foo"), Path("/repo/uavcan").resolve()] + ) + assert root == Path("/repo/uavcan").resolve() + + root = DSDLDefinition._infer_path_to_root_from_first_found( + Path("repo/uavcan/foo/bar/435.baz.1.0.dsdl"), [Path("foo"), Path("repo/uavcan")] + ) + assert root == Path("repo/uavcan") + + # Finally, the method will infer the root namespace from simple folder names if no additional information is + # provided: + + valid_roots = [Path("uavcan"), Path("cyphal")] + + # absolute dsdl path using valid roots + root = DSDLDefinition._infer_path_to_root_from_first_found( + Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl").resolve(), valid_roots + ) + assert root == Path("/repo/uavcan").resolve() + + # relative dsdl path using valid roots + root = DSDLDefinition._infer_path_to_root_from_first_found( + Path("repo/uavcan/foo/bar/435.baz.1.0.dsdl"), valid_roots + ) + assert root == Path("repo/uavcan") + + # absolute dsdl path using valid roots but an invalid file path + with expect_raises(PathInferenceError): + _ = DSDLDefinition._infer_path_to_root_from_first_found( + Path("/repo/crap/foo/bar/435.baz.1.0.dsdl").resolve(), valid_roots + ) + + # relative dsdl path using valid roots but an invalid file path + with expect_raises(PathInferenceError): + _ = DSDLDefinition._infer_path_to_root_from_first_found(Path("repo/crap/foo/bar/435.baz.1.0.dsdl"), valid_roots) + + # relative dsdl path with invalid root fragments + invalid_root_fragments = [Path("cyphal", "acme")] + with expect_raises(PathInferenceError): + _ = DSDLDefinition._infer_path_to_root_from_first_found( + Path("repo/crap/foo/bar/435.baz.1.0.dsdl"), invalid_root_fragments + ) + + # In this example, foo/bar might look like a valid root path but it is not relative to repo/uavcan/foo/bar and is + # not considered after relative path inference has failed because it is not a simple root name. + root = DSDLDefinition._infer_path_to_root_from_first_found( + Path("repo/uavcan/foo/bar/435.baz.1.0.dsdl"), [Path("foo/bar"), Path("foo")] + ) + assert root == Path("repo/uavcan/foo") + + # when foo/bar is placed within the proper, relative path it is considered as a valid root and is preferred over + # the simple root name "foo": + root = DSDLDefinition._infer_path_to_root_from_first_found( + Path("repo/uavcan/foo/bar/435.baz.1.0.dsdl"), [Path("repo/uavcan/foo/bar"), Path("foo")] + ) + assert root == Path("repo/uavcan/foo/bar") + + # Sometimes the root paths have crap in them and need to be resolved: + + root = DSDLDefinition._infer_path_to_root_from_first_found( + Path("/path/to/repo/uavcan/foo/bar/435.baz.1.0.dsdl").resolve(), + [Path("/path/to/repo/uavcan/../uavcan").resolve()], + ) + assert root == Path("/path/to/repo/uavcan").resolve() + + # Let's ensure ordering here + + root = DSDLDefinition._infer_path_to_root_from_first_found( + Path("repo/uavcan/foo/bar/435.baz.1.0.dsdl"), [Path("repo/uavcan"), Path("repo/uavcan/foo")] + ) + assert root == Path("repo/uavcan") + + +def _unittest_from_first_in() -> None: + dsdl_def = DSDLDefinition.from_first_in( + Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl").resolve(), [Path("/repo/uavcan/foo/..").resolve()] + ) + assert dsdl_def.full_name == "uavcan.foo.bar.baz" diff --git a/pydsdl/_error.py b/pydsdl/_error.py index d301765..283486f 100644 --- a/pydsdl/_error.py +++ b/pydsdl/_error.py @@ -108,6 +108,9 @@ class InvalidDefinitionError(FrontendError): """ +# +-[UNIT TESTS]------------------------------------------------------------------------------------------------------+ + + def _unittest_error() -> None: try: raise FrontendError("Hello world!") @@ -124,8 +127,8 @@ def _unittest_error() -> None: try: raise FrontendError("Hello world!", path=Path("path/to/file.dsdl")) except Exception as ex: - assert str(ex) == "path/to/file.dsdl: Hello world!" assert repr(ex) == "FrontendError: 'path/to/file.dsdl: Hello world!'" + assert str(ex) == "path/to/file.dsdl: Hello world!" def _unittest_internal_error_github_reporting() -> None: @@ -150,7 +153,7 @@ def _unittest_internal_error_github_reporting() -> None: print(ex) assert ex.path == Path("FILE_PATH") assert ex.line == 42 - # We have to ignore the last couple of characters because Python before 3.7 reprs Exceptions like this: + # We have to ignore the last couple of characters because Python before 3.7 repr's Exceptions like this: # Exception('ERROR TEXT',) # But newer Pythons do it like this: # Exception('ERROR TEXT') diff --git a/pydsdl/_namespace.py b/pydsdl/_namespace.py index 8e9501e..065c739 100644 --- a/pydsdl/_namespace.py +++ b/pydsdl/_namespace.py @@ -4,13 +4,20 @@ # pylint: disable=logging-not-lazy -from typing import Iterable, Callable, DefaultDict, List, Optional, Union, Set, Dict -import logging +from __future__ import annotations import collections +import logging +from itertools import product, repeat from pathlib import Path -from . import _serializable -from . import _dsdl_definition -from . import _error +from typing import Callable, DefaultDict, Iterable + +from . import _dsdl_definition, _error, _serializable +from ._dsdl import ReadableDSDLFile, PrintOutputHandler, SortedFileList +from ._dsdl import file_sort as dsdl_file_sort +from ._dsdl import normalize_paths_argument_to_list +from ._namespace_reader import DSDLDefinitions, read_definitions + +_logger = logging.getLogger(__name__) class RootNamespaceNameCollisionError(_error.InvalidDefinitionError): @@ -69,19 +76,18 @@ class SealingConsistencyError(_error.InvalidDefinitionError): """ -PrintOutputHandler = Callable[[Path, int, str], None] -"""Invoked when the frontend encounters a print directive or needs to output a generic diagnostic.""" +# +--[PUBLIC API]-----------------------------------------------------------------------------------------------------+ def read_namespace( - root_namespace_directory: Union[Path, str], - lookup_directories: Union[None, Path, str, Iterable[Union[Path, str]]] = None, - print_output_handler: Optional[PrintOutputHandler] = None, + root_namespace_directory: Path | str, + lookup_directories: None | Path | str | Iterable[Path | str] = None, + print_output_handler: PrintOutputHandler | None = None, allow_unregulated_fixed_port_id: bool = False, allow_root_namespace_name_collision: bool = True, -) -> List[_serializable.CompositeType]: +) -> list[_serializable.CompositeType]: """ - This function is the main entry point of the library. + This function is a main entry point for the library. It reads all DSDL definitions from the specified root namespace directory and produces the annotated AST. :param root_namespace_directory: The path of the root namespace directory that will be read. @@ -108,48 +114,26 @@ def read_namespace( the same root namespace name multiple times in the lookup dirs. This will enable defining a namespace partially and let other entities define new messages or new sub-namespaces in the same root namespace. - :return: A list of :class:`pydsdl.CompositeType` sorted lexicographically by full data type name, - then by major version (newest version first), then by minor version (newest version first). - The ordering guarantee allows the caller to always find the newest version simply by picking - the first matching occurrence. + :return: A list of :class:`pydsdl.CompositeType` found under the `root_namespace_directory` and sorted + lexicographically by full data type name, then by major version (newest version first), then by minor + version (newest version first). The ordering guarantee allows the caller to always find the newest version + simply by picking the first matching occurrence. :raises: :class:`pydsdl.FrontendError`, :class:`MemoryError`, :class:`SystemError`, :class:`OSError` if directories do not exist or inaccessible, :class:`ValueError`/:class:`TypeError` if the arguments are invalid. """ - # Add the own root namespace to the set of lookup directories, sort lexicographically, remove duplicates. - # We'd like this to be an iterable list of strings but we handle the common practice of passing in a single path. - if lookup_directories is None: - lookup_directories_path_list: List[Path] = [] - elif isinstance(lookup_directories, (str, bytes, Path)): - lookup_directories_path_list = [Path(lookup_directories)] - else: - lookup_directories_path_list = list(map(Path, lookup_directories)) - - for a in lookup_directories_path_list: - if not isinstance(a, (str, Path)): - raise TypeError("Lookup directories shall be an iterable of paths. Found in list: " + type(a).__name__) - _logger.debug(_LOG_LIST_ITEM_PREFIX + str(a)) - # Normalize paths and remove duplicates. Resolve symlinks to avoid ambiguities. root_namespace_directory = Path(root_namespace_directory).resolve() - lookup_directories_path_list.append(root_namespace_directory) - lookup_directories_path_list = list(sorted({x.resolve() for x in lookup_directories_path_list})) - _logger.debug("Lookup directories are listed below:") - for a in lookup_directories_path_list: - _logger.debug(_LOG_LIST_ITEM_PREFIX + str(a)) - # Check for common usage errors and warn the user if anything looks suspicious. - _ensure_no_common_usage_errors(root_namespace_directory, lookup_directories_path_list, _logger.warning) - - # Check the namespaces. - _ensure_no_nested_root_namespaces(lookup_directories_path_list) - - if not allow_root_namespace_name_collision: - _ensure_no_namespace_name_collisions(lookup_directories_path_list) + lookup_directories_path_list = _construct_lookup_directories_path_list( + [root_namespace_directory], + normalize_paths_argument_to_list(lookup_directories), + allow_root_namespace_name_collision, + ) # Construct DSDL definitions from the target and the lookup dirs. - target_dsdl_definitions = _construct_dsdl_definitions_from_namespace(root_namespace_directory) + target_dsdl_definitions = _construct_dsdl_definitions_from_namespaces([root_namespace_directory]) if not target_dsdl_definitions: _logger.info("The namespace at %s is empty", root_namespace_directory) return [] @@ -157,9 +141,124 @@ def read_namespace( for x in target_dsdl_definitions: _logger.debug(_LOG_LIST_ITEM_PREFIX + str(x)) - lookup_dsdl_definitions = [] # type: List[_dsdl_definition.DSDLDefinition] - for ld in lookup_directories_path_list: - lookup_dsdl_definitions += _construct_dsdl_definitions_from_namespace(ld) + return _complete_read_function( + target_dsdl_definitions, lookup_directories_path_list, print_output_handler, allow_unregulated_fixed_port_id + ).direct + + +# pylint: disable=too-many-arguments +def read_files( + dsdl_files: None | Path | str | Iterable[Path | str], + root_namespace_directories_or_names: None | Path | str | Iterable[Path | str], + lookup_directories: None | Path | str | Iterable[Path | str] = None, + print_output_handler: PrintOutputHandler | None = None, + allow_unregulated_fixed_port_id: bool = False, +) -> tuple[list[_serializable.CompositeType], list[_serializable.CompositeType]]: + """ + This function is a main entry point for the library. + It reads all DSDL definitions from the specified ``dsdl_files`` and produces the annotated AST for these types and + the transitive closure of the types they depend on. + + :param dsdl_files: A list of paths to dsdl files to parse. + + :param root_namespace_directories_or_names: This can be a set of names of root namespaces or relative paths to + root namespaces. All ``dsdl_files`` provided must be under one of these roots. For example, given: + + .. code-block:: python + + dsdl_files = [ + Path("workspace/project/types/animals/felines/Tabby.1.0.dsdl"), + Path("workspace/project/types/animals/canines/Boxer.1.0.dsdl"), + Path("workspace/project/types/plants/trees/DouglasFir.1.0.dsdl") + ] + + + then this argument must be one of: + + .. code-block:: python + + root_namespace_directories_or_names = ["animals", "plants"] + + root_namespace_directories_or_names = [ + Path("workspace/project/types/animals"), + Path("workspace/project/types/plants") + ] + + + :param lookup_directories: List of other namespace directories containing data type definitions that are + referred to from the target dsdl files. For example, if you are reading vendor-specific types, + the list of lookup directories should always include a path to the standard root namespace ``uavcan``, + otherwise the types defined in the vendor-specific namespace won't be able to use data types from the + standard namespace. + + :param print_output_handler: If provided, this callable will be invoked when a ``@print`` directive + is encountered or when the frontend needs to emit a diagnostic; + the arguments are: path, line number (1-based), text. + If not provided, no output will be produced except for the standard Python logging subsystem + (but ``@print`` expressions will be evaluated anyway, and a failed evaluation will be a fatal error). + + :param allow_unregulated_fixed_port_id: Do not reject unregulated fixed port identifiers. + As demanded by the specification, the frontend rejects unregulated fixed port ID by default. + This is a dangerous feature that must not be used unless you understand the risks. + Please read https://opencyphal.org/guide. + + :return: A Tuple of lists of :class:`pydsdl.CompositeType`. The first index in the Tuple are the types parsed from + the ``dsdl_files`` argument. The second index are types that the target ``dsdl_files`` utilizes. + A note for using these values to describe build dependencies: each :class:`pydsdl.CompositeType` has two + fields that provide links back to the filesystem where the dsdl files were located when parsing the type; + ``source_file_path`` and ``source_file_path_to_root``. + + :raises: :class:`pydsdl.FrontendError`, :class:`MemoryError`, :class:`SystemError`, + :class:`OSError` if directories do not exist or inaccessible, + :class:`ValueError`/:class:`TypeError` if the arguments are invalid. + """ + # Normalize paths and remove duplicates. Resolve symlinks to avoid ambiguities. + target_dsdl_definitions = _construct_dsdl_definitions_from_files( + normalize_paths_argument_to_list(dsdl_files), + normalize_paths_argument_to_list(root_namespace_directories_or_names), + ) + if len(target_dsdl_definitions) == 0: + _logger.info("No DSDL files found in the specified directories") + return ([], []) + + if _logger.isEnabledFor(logging.DEBUG): # pragma: no cover + _logger.debug("Target DSDL definitions are listed below:") + + for x in target_dsdl_definitions: + _logger.debug(_LOG_LIST_ITEM_PREFIX + str(x.file_path)) + + root_namespaces = {f.root_namespace_path.resolve() for f in target_dsdl_definitions} + lookup_directories_path_list = _construct_lookup_directories_path_list( + root_namespaces, + normalize_paths_argument_to_list(lookup_directories), + True, + ) + + definitions = _complete_read_function( + target_dsdl_definitions, lookup_directories_path_list, print_output_handler, allow_unregulated_fixed_port_id + ) + + return (definitions.direct, definitions.transitive) + + +# +--[INTERNAL API::PUBLIC API HELPERS]-------------------------------------------------------------------------------+ +# These are functions called by the public API before the actual processing begins. + +DSDL_FILE_SUFFIX = ".dsdl" +DSDL_FILE_GLOB = f"*{DSDL_FILE_SUFFIX}" +DSDL_FILE_SUFFIX_LEGACY = ".uavcan" +DSDL_FILE_GLOB_LEGACY = f"*{DSDL_FILE_SUFFIX_LEGACY}" +_LOG_LIST_ITEM_PREFIX = " " * 4 + + +def _complete_read_function( + target_dsdl_definitions: SortedFileList[ReadableDSDLFile], + lookup_directories_path_list: list[Path], + print_output_handler: PrintOutputHandler | None, + allow_unregulated_fixed_port_id: bool, +) -> DSDLDefinitions: + + lookup_dsdl_definitions = _construct_dsdl_definitions_from_namespaces(lookup_directories_path_list) # Check for collisions against the lookup definitions also. _ensure_no_collisions(target_dsdl_definitions, lookup_dsdl_definitions) @@ -177,8 +276,9 @@ def read_namespace( ", ".join(set(sorted(map(lambda t: t.root_namespace, lookup_dsdl_definitions)))), ) - # Read the constructed definitions. - types = _read_namespace_definitions( + # This is the biggie. All the rest of the wrangling is just to get to this point. This will take the + # most time and memory. + definitions = read_definitions( target_dsdl_definitions, lookup_dsdl_definitions, print_output_handler, allow_unregulated_fixed_port_id ) @@ -188,62 +288,107 @@ def read_namespace( # directories may contain issues and mistakes that are outside of the control of the user (e.g., # they could be managed by a third party) -- the user shouldn't be affected by mistakes committed # by the third party. - _ensure_no_fixed_port_id_collisions(types) - _ensure_minor_version_compatibility(types) + _ensure_no_fixed_port_id_collisions(definitions.direct) + _ensure_minor_version_compatibility(definitions.transitive + definitions.direct) - return types + return definitions -DSDL_FILE_GLOB = "*.dsdl" -DSDL_FILE_GLOB_LEGACY = "*.uavcan" -_LOG_LIST_ITEM_PREFIX = " " * 4 +def _construct_lookup_directories_path_list( + root_namespace_directories: Iterable[Path], + lookup_directories_path_list: list[Path], + allow_root_namespace_name_collision: bool, +) -> list[Path]: + """ + Intermediate transformation and validation of inputs into a list of lookup directories as paths. -_logger = logging.getLogger(__name__) + :param root_namespace_directories: The path of the root namespace directory that will be read. + For example, ``dsdl/uavcan`` to read the ``uavcan`` namespace. + :param lookup_directories_path_list: List of other namespace directories containing data type definitions that are + referred to from the target root namespace. For example, if you are reading a vendor-specific namespace, + the list of lookup directories should always include a path to the standard root namespace ``uavcan``, + otherwise the types defined in the vendor-specific namespace won't be able to use data types from the + standard namespace. -def _read_namespace_definitions( - target_definitions: List[_dsdl_definition.DSDLDefinition], - lookup_definitions: List[_dsdl_definition.DSDLDefinition], - print_output_handler: Optional[PrintOutputHandler] = None, - allow_unregulated_fixed_port_id: bool = False, -) -> List[_serializable.CompositeType]: - """ - Construct type descriptors from the specified target definitions. - Allow the target definitions to use the lookup definitions within themselves. - :param target_definitions: Which definitions to read. - :param lookup_definitions: Which definitions can be used by the processed definitions. - :return: A list of types. + :param allow_root_namespace_name_collision: Allow using the source root namespace name in the look up dirs or + the same root namespace name multiple times in the lookup dirs. This will enable defining a namespace + partially and let other entities define new messages or new sub-namespaces in the same root namespace. + + :return: A list of lookup directories as paths. + + :raises: :class:`pydsdl.FrontendError`, :class:`MemoryError`, :class:`SystemError`, + :class:`OSError` if directories do not exist or inaccessible, + :class:`ValueError`/:class:`TypeError` if the arguments are invalid. """ + # Add the own root namespace to the set of lookup directories, sort lexicographically, remove duplicates. + # We'd like this to be an iterable list of strings but we handle the common practice of passing in a single path. - def make_print_handler(definition: _dsdl_definition.DSDLDefinition) -> Callable[[int, str], None]: - def handler(line_number: int, text: str) -> None: - if print_output_handler: # pragma: no branch - assert isinstance(line_number, int) and isinstance(text, str) - assert line_number > 0, "Line numbers must be one-based" - print_output_handler(definition.file_path, line_number, text) + # Normalize paths and remove duplicates. Resolve symlinks to avoid ambiguities. + lookup_directories_path_list.extend(root_namespace_directories) + lookup_directories_path_list = list(sorted({x.resolve() for x in lookup_directories_path_list})) + _logger.debug("Lookup directories are listed below:") + for a in lookup_directories_path_list: + _logger.debug(_LOG_LIST_ITEM_PREFIX + str(a)) + + # Check for common usage errors and warn the user if anything looks suspicious. + _ensure_no_common_usage_errors(root_namespace_directories, lookup_directories_path_list, _logger.warning) - return handler + # Check the namespaces and ensure that there are no name collisions. + _ensure_no_namespace_name_collisions_or_nested_root_namespaces( + lookup_directories_path_list, allow_root_namespace_name_collision + ) - types = [] # type: List[_serializable.CompositeType] - for tdd in target_definitions: - try: - dt = tdd.read(lookup_definitions, make_print_handler(tdd), allow_unregulated_fixed_port_id) - except _error.FrontendError as ex: # pragma: no cover - ex.set_error_location_if_unknown(path=tdd.file_path) - raise ex - except (MemoryError, SystemError): # pragma: no cover - raise - except Exception as ex: # pragma: no cover - raise _error.InternalError(culprit=ex, path=tdd.file_path) from ex - else: - types.append(dt) + return lookup_directories_path_list + + +def _construct_dsdl_definitions_from_files( + dsdl_files: list[Path], + valid_roots: list[Path], +) -> SortedFileList[ReadableDSDLFile]: + """ """ + output = set() # type: set[ReadableDSDLFile] + for fp in dsdl_files: + resolved_fp = fp.resolve(strict=False) + if resolved_fp.suffix == DSDL_FILE_SUFFIX_LEGACY: + _logger.warning( + "File uses deprecated extension %r, please rename to use %r: %s", + DSDL_FILE_SUFFIX_LEGACY, + DSDL_FILE_SUFFIX, + resolved_fp, + ) + output.add(_dsdl_definition.DSDLDefinition.from_first_in(resolved_fp, list(valid_roots))) - return types + return dsdl_file_sort(output) + + +def _construct_dsdl_definitions_from_namespaces( + root_namespace_paths: list[Path], +) -> SortedFileList[ReadableDSDLFile]: + """ + Accepts a directory path, returns a sorted list of abstract DSDL file representations. Those can be read later. + The definitions are sorted by name lexicographically, then by major version (greatest version first), + then by minor version (same ordering as the major version). + """ + source_file_paths: set[tuple[Path, Path]] = set() # index of all file paths already found + for root_namespace_path in root_namespace_paths: + for p in root_namespace_path.rglob(DSDL_FILE_GLOB): + source_file_paths.add((p, root_namespace_path)) + for p in root_namespace_path.rglob(DSDL_FILE_GLOB_LEGACY): + source_file_paths.add((p, root_namespace_path)) + _logger.warning( + "File uses deprecated extension %r, please rename to use %r: %s", + DSDL_FILE_GLOB_LEGACY, + DSDL_FILE_GLOB, + p, + ) + + return dsdl_file_sort([_dsdl_definition.DSDLDefinition(*p) for p in source_file_paths]) def _ensure_no_collisions( - target_definitions: List[_dsdl_definition.DSDLDefinition], - lookup_definitions: List[_dsdl_definition.DSDLDefinition], + target_definitions: list[ReadableDSDLFile], + lookup_definitions: list[ReadableDSDLFile], ) -> None: for tg in target_definitions: tg_full_namespace_period = tg.full_namespace.lower() + "." @@ -279,7 +424,7 @@ def _ensure_no_collisions( raise DataTypeCollisionError("This type is redefined in %s" % lu.file_path, path=tg.file_path) -def _ensure_no_fixed_port_id_collisions(types: List[_serializable.CompositeType]) -> None: +def _ensure_no_fixed_port_id_collisions(types: list[_serializable.CompositeType]) -> None: for a in types: for b in types: different_names = a.full_name != b.full_name @@ -300,13 +445,13 @@ def _ensure_no_fixed_port_id_collisions(types: List[_serializable.CompositeType] ) -def _ensure_minor_version_compatibility(types: List[_serializable.CompositeType]) -> None: - by_name = collections.defaultdict(list) # type: DefaultDict[str, List[_serializable.CompositeType]] +def _ensure_minor_version_compatibility(types: list[_serializable.CompositeType]) -> None: + by_name = collections.defaultdict(list) # type: DefaultDict[str, list[_serializable.CompositeType]] for t in types: by_name[t.full_name].append(t) for definitions in by_name.values(): - by_major = collections.defaultdict(list) # type: DefaultDict[int, List[_serializable.CompositeType]] + by_major = collections.defaultdict(list) # type: DefaultDict[int, list[_serializable.CompositeType]] for t in definitions: by_major[t.version.major].append(t) @@ -375,7 +520,7 @@ def _ensure_minor_version_compatibility_pairwise( def _ensure_no_common_usage_errors( - root_namespace_directory: Path, lookup_directories: Iterable[Path], reporter: Callable[[str], None] + root_namespace_directories: Iterable[Path], lookup_directories: Iterable[Path], reporter: Callable[[str], None] ) -> None: suspicious_base_names = [ "public_regulated_data_types", @@ -391,7 +536,7 @@ def is_valid_name(s: str) -> bool: return True # resolve() will also normalize the case in case-insensitive filesystems. - all_paths = {root_namespace_directory.resolve()} | {x.resolve() for x in lookup_directories} + all_paths = {y.resolve() for y in root_namespace_directories} | {x.resolve() for x in lookup_directories} for p in all_paths: try: candidates = [x for x in p.iterdir() if x.is_dir() and is_valid_name(x.name)] @@ -408,59 +553,51 @@ def is_valid_name(s: str) -> bool: reporter(report) -def _ensure_no_nested_root_namespaces(directories: Iterable[Path]) -> None: - dirs = {x.resolve() for x in directories} # normalize the case in case-insensitive filesystems - for a in dirs: - for b in dirs: - if a.samefile(b): - continue +def _ensure_no_namespace_name_collisions_or_nested_root_namespaces( + directories: Iterable[Path], allow_name_collisions: bool +) -> None: + directories = {x.resolve() for x in directories} # normalize the case in case-insensitive filesystems + + def check_each(path_tuple_with_result: tuple[tuple[Path, Path], list[int]]) -> bool: + path_tuple = path_tuple_with_result[0] + if not path_tuple[0].samefile(path_tuple[1]): + if not allow_name_collisions and path_tuple[0].name.lower() == path_tuple[1].name.lower(): + return True try: - a.relative_to(b) + path_tuple[0].relative_to(path_tuple[1]) except ValueError: pass else: - raise NestedRootNamespaceError( - "The following namespace is nested inside this one, which is not permitted: %s" % a, path=b - ) - - -def _ensure_no_namespace_name_collisions(directories: Iterable[Path]) -> None: - directories = {x.resolve() for x in directories} # normalize the case in case-insensitive filesystems - for a in directories: - for b in directories: - if a.samefile(b): - continue - if a.name.lower() == b.name.lower(): - _logger.info("Collision: %r [%r] == %r [%r]", a, a.name, b, b.name) - raise RootNamespaceNameCollisionError("The name of this namespace conflicts with %s" % b, path=a) - - -def _construct_dsdl_definitions_from_namespace(root_namespace_path: Path) -> List[_dsdl_definition.DSDLDefinition]: - """ - Accepts a directory path, returns a sorted list of abstract DSDL file representations. Those can be read later. - The definitions are sorted by name lexicographically, then by major version (greatest version first), - then by minor version (same ordering as the major version). - """ - source_file_paths: Set[Path] = set() - for p in root_namespace_path.rglob(DSDL_FILE_GLOB): - source_file_paths.add(p) - for p in root_namespace_path.rglob(DSDL_FILE_GLOB_LEGACY): - source_file_paths.add(p) - _logger.warning( - "File uses deprecated extension %r, please rename to use %r: %s", DSDL_FILE_GLOB_LEGACY, DSDL_FILE_GLOB, p - ) + path_tuple_with_result[1][0] = 1 + return True + return False + + # zip a list[1] of int 0 so we can assign a failure type. 0 is name collision and 1 is nested root namespace + # further cartesian checks can be added here using this pattern + + # next/filter returns the first failure or None if no failures + check_result = next(filter(check_each, zip(product(directories, directories), repeat([0]))), None) + + if check_result: + path_tuple = check_result[0] + failure_type = check_result[1][0] + if failure_type == 0: + raise RootNamespaceNameCollisionError( + "The following namespaces have the same name: %s" % path_tuple[0], path=path_tuple[1] + ) + else: + raise NestedRootNamespaceError( + "The following namespace is nested inside this one, which is not permitted: %s" % path_tuple[0], + path=path_tuple[1], + ) - output = [] # type: List[_dsdl_definition.DSDLDefinition] - for fp in sorted(source_file_paths): - dsdl_def = _dsdl_definition.DSDLDefinition(fp, root_namespace_path) - output.append(dsdl_def) - # Lexicographically by name, newest version first. - return list(sorted(output, key=lambda d: (d.full_name, -d.version.major, -d.version.minor))) +# +--[ UNIT TESTS ]---------------------------------------------------------------------------------------------------+ def _unittest_dsdl_definition_constructor() -> None: import tempfile + from ._dsdl_definition import FileNameFormatError with tempfile.TemporaryDirectory() as directory: @@ -472,9 +609,9 @@ def _unittest_dsdl_definition_constructor() -> None: (root / "nested/2.Asd.21.32.dsdl").write_text("# TEST B") (root / "nested/Foo.32.43.dsdl").write_text("# TEST C") - dsdl_defs = _construct_dsdl_definitions_from_namespace(root) + dsdl_defs = _construct_dsdl_definitions_from_namespaces([root]) print(dsdl_defs) - lut = {x.full_name: x for x in dsdl_defs} # type: Dict[str, _dsdl_definition.DSDLDefinition] + lut = {x.full_name: x for x in dsdl_defs} # type: dict[str, ReadableDSDLFile] assert len(lut) == 3 assert str(lut["foo.Qwerty"]) == repr(lut["foo.Qwerty"]) @@ -528,7 +665,7 @@ def _unittest_dsdl_definition_constructor() -> None: (root / "nested/Malformed.MAJOR.MINOR.dsdl").touch() try: - _construct_dsdl_definitions_from_namespace(root) + _construct_dsdl_definitions_from_namespaces([root]) except FileNameFormatError as ex: print(ex) (root / "nested/Malformed.MAJOR.MINOR.dsdl").unlink() @@ -537,7 +674,7 @@ def _unittest_dsdl_definition_constructor() -> None: (root / "nested/NOT_A_NUMBER.Malformed.1.0.dsdl").touch() try: - _construct_dsdl_definitions_from_namespace(root) + _construct_dsdl_definitions_from_namespaces([root]) except FileNameFormatError as ex: print(ex) (root / "nested/NOT_A_NUMBER.Malformed.1.0.dsdl").unlink() @@ -546,26 +683,26 @@ def _unittest_dsdl_definition_constructor() -> None: (root / "nested/Malformed.dsdl").touch() try: - _construct_dsdl_definitions_from_namespace(root) + _construct_dsdl_definitions_from_namespaces([root]) except FileNameFormatError as ex: print(ex) (root / "nested/Malformed.dsdl").unlink() else: # pragma: no cover assert False - _construct_dsdl_definitions_from_namespace(root) # making sure all errors are cleared + _construct_dsdl_definitions_from_namespaces([root]) # making sure all errors are cleared (root / "nested/super.bad").mkdir() (root / "nested/super.bad/Unreachable.1.0.dsdl").touch() try: - _construct_dsdl_definitions_from_namespace(root) + _construct_dsdl_definitions_from_namespaces([root]) except FileNameFormatError as ex: print(ex) else: # pragma: no cover assert False try: - _construct_dsdl_definitions_from_namespace(root / "nested/super.bad") + _construct_dsdl_definitions_from_namespaces([root / "nested/super.bad"]) except FileNameFormatError as ex: print(ex) else: # pragma: no cover @@ -582,9 +719,9 @@ def _unittest_dsdl_definition_constructor_legacy() -> None: root = di / "foo" root.mkdir() (root / "123.Qwerty.123.234.uavcan").write_text("# TEST A") - dsdl_defs = _construct_dsdl_definitions_from_namespace(root) + dsdl_defs = _construct_dsdl_definitions_from_namespaces([root]) print(dsdl_defs) - lut = {x.full_name: x for x in dsdl_defs} # type: Dict[str, _dsdl_definition.DSDLDefinition] + lut = {x.full_name: x for x in dsdl_defs} # type: dict[str, ReadableDSDLFile] assert len(lut) == 1 t = lut["foo.Qwerty"] assert t.file_path == root / "123.Qwerty.123.234.uavcan" @@ -607,35 +744,36 @@ def _unittest_common_usage_errors() -> None: root_ns_dir = di / "foo" root_ns_dir.mkdir() - reports = [] # type: List[str] + reports = [] # type: list[str] - _ensure_no_common_usage_errors(root_ns_dir, [], reports.append) + _ensure_no_common_usage_errors([root_ns_dir], [], reports.append) assert not reports - _ensure_no_common_usage_errors(root_ns_dir, [di / "baz"], reports.append) + _ensure_no_common_usage_errors([root_ns_dir], [di / "baz"], reports.append) assert not reports dir_dsdl = root_ns_dir / "dsdl" dir_dsdl.mkdir() - _ensure_no_common_usage_errors(dir_dsdl, [di / "baz"], reports.append) + _ensure_no_common_usage_errors([dir_dsdl], [di / "baz"], reports.append) assert not reports # Because empty. dir_dsdl_vscode = dir_dsdl / ".vscode" dir_dsdl_vscode.mkdir() - _ensure_no_common_usage_errors(dir_dsdl, [di / "baz"], reports.append) + _ensure_no_common_usage_errors([dir_dsdl], [di / "baz"], reports.append) assert not reports # Because the name is not valid. dir_dsdl_uavcan = dir_dsdl / "uavcan" dir_dsdl_uavcan.mkdir() - _ensure_no_common_usage_errors(dir_dsdl, [di / "baz"], reports.append) + _ensure_no_common_usage_errors([dir_dsdl], [di / "baz"], reports.append) (rep,) = reports reports.clear() assert str(dir_dsdl_uavcan.resolve()).lower() in rep.lower() def _unittest_nested_roots() -> None: - from pytest import raises import tempfile + from pytest import raises + with tempfile.TemporaryDirectory() as directory: di = Path(directory) (di / "a").mkdir() @@ -643,13 +781,13 @@ def _unittest_nested_roots() -> None: (di / "a/b").mkdir() (di / "a/c").mkdir() (di / "aa/b").mkdir() - _ensure_no_nested_root_namespaces([]) - _ensure_no_nested_root_namespaces([di / "a"]) - _ensure_no_nested_root_namespaces([di / "a/b", di / "a/c"]) + _ensure_no_namespace_name_collisions_or_nested_root_namespaces([], True) + _ensure_no_namespace_name_collisions_or_nested_root_namespaces([di / "a"], True) + _ensure_no_namespace_name_collisions_or_nested_root_namespaces([di / "a/b", di / "a/c"], True) with raises(NestedRootNamespaceError): - _ensure_no_nested_root_namespaces([di / "a/b", di / "a"]) - _ensure_no_nested_root_namespaces([di / "aa/b", di / "a"]) - _ensure_no_nested_root_namespaces([di / "a/b", di / "aa"]) + _ensure_no_namespace_name_collisions_or_nested_root_namespaces([di / "a/b", di / "a"], True) + _ensure_no_namespace_name_collisions_or_nested_root_namespaces([di / "aa/b", di / "a"], True) + _ensure_no_namespace_name_collisions_or_nested_root_namespaces([di / "a/b", di / "aa"], True) def _unittest_issue_71() -> None: # https://github.com/OpenCyphal/pydsdl/issues/71 @@ -663,3 +801,85 @@ def _unittest_issue_71() -> None: # https://github.com/OpenCyphal/pydsdl/issues (real / "Msg.0.1.dsdl").write_text("@sealed") assert len(read_namespace(real, [real, link])) == 1 assert len(read_namespace(link, [real, link])) == 1 + + +def _unittest_type_read_files_example(temp_dsdl_factory) -> None: # type: ignore + # let's test the comments for the read function + dsdl_files = [ + Path("workspace/project/types/animals/felines/Tabby.1.0.uavcan"), # keep .uavcan to cover the warning + Path("workspace/project/types/animals/canines/Boxer.1.0.dsdl"), + Path("workspace/project/types/plants/trees/DouglasFir.1.0.dsdl"), + ] + + dsdl_files_abs = [] + root_namespace_paths = set() + for dsdl_file in dsdl_files: + dsdl_files_abs.append(temp_dsdl_factory.new_file(dsdl_file, "@sealed")) + root_namespace_paths.add(temp_dsdl_factory.base_dir / dsdl_file.parent.parent) + root_namespace_directories_or_names_simple = ["animals", "plants"] + + direct, transitive = read_files(dsdl_files_abs, root_namespace_directories_or_names_simple) + + assert len(direct) == len(dsdl_files) + assert len(transitive) == 0 + + for direct_type in direct: + assert direct_type.root_namespace in root_namespace_directories_or_names_simple + assert direct_type.source_file_path_to_root in root_namespace_paths + + direct, _ = read_files(dsdl_files_abs, root_namespace_paths) + + assert len(direct) == len(dsdl_files) + + for direct_type in direct: + assert direct_type.root_namespace in root_namespace_directories_or_names_simple + assert direct_type.source_file_path_to_root in root_namespace_paths + + +def _unittest_targets_found_in_lookup_namespaces(temp_dsdl_factory) -> None: # type: ignore + + # call read_files with a list of dsdl files which are also located in the provided lookup namespaces + + plant_1_0 = Path("types/plants/Plant.1.0.dsdl") + tree_1_0 = Path("types/plants/trees/Tree.1.0.dsdl") + douglas_fir_1_0 = Path("types/plants/trees/DouglasFir.1.0.dsdl") + + plant_file = temp_dsdl_factory.new_file(plant_1_0, "@sealed\n") + test_files = [ + temp_dsdl_factory.new_file(tree_1_0, "@sealed\nplants.Plant.1.0 plt\n"), + temp_dsdl_factory.new_file(douglas_fir_1_0, "@sealed\nplants.trees.Tree.1.0 tree\n"), + ] + lookup_dirs = [plant_file.parent] + + direct, transitive = read_files(test_files, lookup_dirs) + + assert len(direct) == len(test_files) + assert len(transitive) == 1 + + +def _unittest_read_files_empty_args() -> None: + direct, transitive = read_files([], []) + + assert len(direct) == 0 + assert len(transitive) == 0 + + +def _unittest_ensure_no_collisions(temp_dsdl_factory) -> None: # type: ignore + from pytest import raises as expect_raises + + _ = temp_dsdl_factory + + # gratuitous coverage of the collision check where other tests don't cover some edge cases + _ensure_no_namespace_name_collisions_or_nested_root_namespaces([], False) + + with expect_raises(DataTypeNameCollisionError): + _ensure_no_collisions( + [_dsdl_definition.DSDLDefinition(Path("a/b.1.0.dsdl"), Path("a"))], + [_dsdl_definition.DSDLDefinition(Path("a/B.1.0.dsdl"), Path("a"))], + ) + + with expect_raises(DataTypeNameCollisionError): + _ensure_no_collisions( + [_dsdl_definition.DSDLDefinition(Path("a/b/c.1.0.dsdl"), Path("a"))], + [_dsdl_definition.DSDLDefinition(Path("a/b.1.0.dsdl"), Path("a"))], + ) diff --git a/pydsdl/_namespace_reader.py b/pydsdl/_namespace_reader.py new file mode 100644 index 0000000..adb5130 --- /dev/null +++ b/pydsdl/_namespace_reader.py @@ -0,0 +1,392 @@ +# Copyright (C) OpenCyphal Development Team +# Copyright Amazon.com Inc. or its affiliates. +# SPDX-License-Identifier: MIT + +from __future__ import annotations +import functools +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import cast + +from ._dsdl import DefinitionVisitor, DSDLFile, ReadableDSDLFile, PrintOutputHandler, SortedFileList +from ._dsdl import file_sort as dsdl_file_sort +from ._error import FrontendError, InternalError +from ._serializable._composite import CompositeType + + +# pylint: disable=too-many-arguments +def _read_definitions( + target_definitions: SortedFileList[ReadableDSDLFile], + lookup_definitions: SortedFileList[ReadableDSDLFile], + print_output_handler: PrintOutputHandler | None, + allow_unregulated_fixed_port_id: bool, + direct: set[CompositeType], + transitive: set[CompositeType], + file_pool: dict[Path, ReadableDSDLFile], + level: int, +) -> None: + """ + Don't look at me! I'm hideous! + (recursive method with a lot of arguments. See read_definitions for documentation) + """ + + _pending_definitions: set[ReadableDSDLFile] = set() + + class _Callback(DefinitionVisitor): + def on_definition(self, _: DSDLFile, dependency_dsdl_file: ReadableDSDLFile) -> None: + if dependency_dsdl_file.file_path not in file_pool: + _pending_definitions.add(dependency_dsdl_file) + + def print_handler(file: Path, line: int, message: str) -> None: + if print_output_handler is not None: + print_output_handler(file, line, message) + + for target_definition in target_definitions: + + if not isinstance(target_definition, ReadableDSDLFile): + raise TypeError("Expected ReadableDSDLFile, got: " + type(target_definition).__name__) + + target_definition = file_pool.setdefault(target_definition.file_path, target_definition) + # make sure we are working with the same object for a given file path + + if target_definition.composite_type is not None and ( + target_definition.composite_type in direct or target_definition.composite_type in transitive + ): + logging.debug("Skipping target file %s because it has already been processed", target_definition.file_path) + if level == 0 and target_definition.composite_type in transitive: + # promote to direct + transitive.remove(target_definition.composite_type) + direct.add(target_definition.composite_type) + continue + + try: + new_composite_type = target_definition.read( + lookup_definitions, + [_Callback()], + functools.partial(print_handler, target_definition.file_path), + allow_unregulated_fixed_port_id, + ) + except FrontendError as ex: # pragma: no cover + ex.set_error_location_if_unknown(path=target_definition.file_path) + raise ex + except Exception as ex: # pragma: no cover + raise InternalError(culprit=ex, path=target_definition.file_path) from ex + + if level == 0: + + direct.add(new_composite_type) + try: + transitive.remove(new_composite_type) + except KeyError: + pass + else: + transitive.add(new_composite_type) + + if len(_pending_definitions) > 0: + _read_definitions( + dsdl_file_sort(_pending_definitions), + lookup_definitions, + print_output_handler, + allow_unregulated_fixed_port_id, + direct, + transitive, + file_pool, + level + 1, + ) + _pending_definitions.clear() + + +# +---[FILE: PUBLIC]--------------------------------------------------------------------------------------------------+ + + +@dataclass(frozen=True) +class DSDLDefinitions: + """ + Common DSDL definition set including the direct dependencies requested and the transitive dependencies found. + The former and latter sets will be disjoint. + """ + + direct: SortedFileList[CompositeType] + transitive: SortedFileList[CompositeType] + + +def read_definitions( + target_definitions: SortedFileList[ReadableDSDLFile], + lookup_definitions: SortedFileList[ReadableDSDLFile], + print_output_handler: PrintOutputHandler | None, + allow_unregulated_fixed_port_id: bool, +) -> DSDLDefinitions: + """ + Given a set of DSDL files, this method reads the text and invokes the parser for each and for any files found in the + lookup set where these are used by the target set. + + :param target_definitions: List of definitions to read. + :param lookup_definitions: List of definitions available for referring to. + :param print_output_handler: Used for @print and for diagnostics: (line_number, text) -> None. + :param allow_unregulated_fixed_port_id: Do not complain about fixed unregulated port IDs. + :return: The data type representation. + :raises InvalidDefinitionError: If a dependency is missing. + :raises InternalError: If an unexpected error occurs. + """ + _direct: set[CompositeType] = set() + _transitive: set[CompositeType] = set() + _file_pool: dict[Path, ReadableDSDLFile] = {} + _read_definitions( + target_definitions, + lookup_definitions, + print_output_handler, + allow_unregulated_fixed_port_id, + _direct, + _transitive, + _file_pool, + 0, + ) + return DSDLDefinitions( + dsdl_file_sort(_direct), + dsdl_file_sort(_transitive), + ) + + +# +-[UNIT TESTS]------------------------------------------------------------------------------------------------------+ + + +def _unittest_namespace_reader_read_definitions(temp_dsdl_factory) -> None: # type: ignore + from . import _dsdl_definition + + target = temp_dsdl_factory.new_file(Path("root", "ns", "Target.1.1.dsdl"), "@sealed") + target_definitions = [cast(ReadableDSDLFile, _dsdl_definition.DSDLDefinition(target, target.parent))] + lookup_definitions: list[ReadableDSDLFile] = [] + + read_definitions(target_definitions, lookup_definitions, None, True) + + +def _unittest_namespace_reader_read_definitions_multiple(temp_dsdl_factory) -> None: # type: ignore + from . import _dsdl_definition + + targets = [ + temp_dsdl_factory.new_file(Path("root", "ns", "Target.1.1.dsdl"), "@sealed\nns.Aisle.1.0 paper_goods\n"), + temp_dsdl_factory.new_file(Path("root", "ns", "Target.2.0.dsdl"), "@sealed\nns.Aisle.2.0 paper_goods\n"), + temp_dsdl_factory.new_file(Path("root", "ns", "Walmart.2.4.dsdl"), "@sealed\nns.Aisle.1.0 paper_goods\n"), + ] + aisles = [ + temp_dsdl_factory.new_file(Path("root", "ns", "Aisle.1.0.dsdl"), "@sealed"), + temp_dsdl_factory.new_file(Path("root", "ns", "Aisle.2.0.dsdl"), "@sealed"), + temp_dsdl_factory.new_file(Path("root", "ns", "Aisle.3.0.dsdl"), "@sealed"), + ] + + definitions = read_definitions( + [_dsdl_definition.DSDLDefinition(t, t.parent) for t in targets], + [_dsdl_definition.DSDLDefinition(a, a.parent) for a in aisles], + None, + True, + ) + + assert len(definitions.direct) == 3 + assert len(definitions.transitive) == 2 + + +def _unittest_namespace_reader_read_definitions_multiple_no_load(temp_dsdl_factory) -> None: # type: ignore + """ + Ensure that the loader does not load files that are not in the transitive closure of the target files. + """ + from . import _dsdl_definition + + targets = [ + temp_dsdl_factory.new_file(Path("root", "ns", "Adams.1.0.dsdl"), "@sealed\nns.Tacoma.1.0 volcano\n"), + temp_dsdl_factory.new_file(Path("root", "ns", "Hood.1.0.dsdl"), "@sealed\nns.Rainer.1.0 volcano\n"), + temp_dsdl_factory.new_file(Path("root", "ns", "StHelens.2.1.dsdl"), "@sealed\nns.Baker.1.0 volcano\n"), + ] + dependencies = [ + temp_dsdl_factory.new_file(Path("root", "ns", "Tacoma.1.0.dsdl"), "@sealed"), + temp_dsdl_factory.new_file(Path("root", "ns", "Rainer.1.0.dsdl"), "@sealed"), + temp_dsdl_factory.new_file(Path("root", "ns", "Baker.1.0.dsdl"), "@sealed"), + Path( + "root", "ns", "Shasta.1.0.dsdl" + ), # since this isn't in the transitive closure of target dependencies it will + # never be read thus it will not be an error that it does not exist. + ] + + target_definitions = [cast(ReadableDSDLFile, _dsdl_definition.DSDLDefinition(t, t.parent)) for t in targets] + lookup_definitions = [cast(ReadableDSDLFile, _dsdl_definition.DSDLDefinition(a, a.parent)) for a in dependencies] + _ = read_definitions( + target_definitions, + lookup_definitions, + None, + True, + ) + + # make sure Shasta.1.0 was never accessed but Tacoma 1.0 was + last_item = lookup_definitions[-1] + assert isinstance(last_item, _dsdl_definition.DSDLDefinition) + assert last_item._text is None # pylint: disable=protected-access + assert lookup_definitions[0].composite_type is not None + + # Make sure text is cached. + assert lookup_definitions[0].text == lookup_definitions[0].text + + +def _unittest_namespace_reader_read_definitions_promotion(temp_dsdl_factory) -> None: # type: ignore + from . import _dsdl_definition + + user_1_0 = temp_dsdl_factory.new_file(Path("root", "ns", "User.1.0.dsdl"), "@sealed\n") + targets = [ + temp_dsdl_factory.new_file(Path("root", "ns", "User.2.0.dsdl"), "@sealed\nns.User.1.0 old_guy\n"), + user_1_0, + ] + lookups = [user_1_0] + + definitions = read_definitions( + [_dsdl_definition.DSDLDefinition(t, t.parent) for t in targets], + [_dsdl_definition.DSDLDefinition(l, l.parent) for l in lookups], + None, + True, + ) + + assert len(definitions.direct) == 2 + assert len(definitions.transitive) == 0 + + +def _unittest_namespace_reader_read_definitions_no_demote(temp_dsdl_factory) -> None: # type: ignore + from . import _dsdl_definition + + user_1_0 = temp_dsdl_factory.new_file(Path("root", "ns", "User.1.0.dsdl"), "@sealed\n") + targets = [ + user_1_0, + temp_dsdl_factory.new_file(Path("root", "ns", "User.2.0.dsdl"), "@sealed\nns.User.1.0 old_guy\n"), + ] + lookups = [user_1_0] + + definitions = read_definitions( + [_dsdl_definition.DSDLDefinition(t, t.parent) for t in targets], + [_dsdl_definition.DSDLDefinition(l, l.parent) for l in lookups], + None, + True, + ) + + assert len(definitions.direct) == 2 + assert len(definitions.transitive) == 0 + + +def _unittest_namespace_reader_read_definitions_no_promote(temp_dsdl_factory) -> None: # type: ignore + from . import _dsdl_definition + + targets = [ + temp_dsdl_factory.new_file(Path("root", "ns", "User.2.0.dsdl"), "@sealed\nns.User.1.0 old_guy\n"), + temp_dsdl_factory.new_file(Path("root", "ns", "User.3.0.dsdl"), "@sealed\nns.User.1.0 old_guy\n"), + ] + lookups = [temp_dsdl_factory.new_file(Path("root", "ns", "User.1.0.dsdl"), "@sealed\n")] + + definitions = read_definitions( + [_dsdl_definition.DSDLDefinition(t, t.parent) for t in targets], + [_dsdl_definition.DSDLDefinition(l, l.parent) for l in lookups], + None, + True, + ) + + assert len(definitions.direct) == 2 + assert len(definitions.transitive) == 1 + + +def _unittest_namespace_reader_read_definitions_twice(temp_dsdl_factory) -> None: # type: ignore + from . import _dsdl_definition + + targets = [ + temp_dsdl_factory.new_file(Path("root", "ns", "User.2.0.dsdl"), "@sealed\nns.User.1.0 old_guy\n"), + temp_dsdl_factory.new_file(Path("root", "ns", "User.2.0.dsdl"), "@sealed\nns.User.1.0 old_guy\n"), + ] + lookups = [temp_dsdl_factory.new_file(Path("root", "ns", "User.1.0.dsdl"), "@sealed\n")] + + definitions = read_definitions( + [_dsdl_definition.DSDLDefinition(t, t.parent) for t in targets], + [_dsdl_definition.DSDLDefinition(l, l.parent) for l in lookups], + None, + True, + ) + + assert len(definitions.direct) == 1 + assert len(definitions.transitive) == 1 + + +def _unittest_namespace_reader_read_definitions_missing_dependency(temp_dsdl_factory) -> None: # type: ignore + """ + Verify that an error is raised when a dependency is missing. + """ + from pytest import raises as assert_raises + + from . import _dsdl_definition + from ._data_type_builder import UndefinedDataTypeError + + with assert_raises(UndefinedDataTypeError): + read_definitions( + [ + _dsdl_definition.DSDLDefinition( + f := temp_dsdl_factory.new_file( + Path("root", "ns", "Cat.1.0.dsdl"), "@sealed\nns.Birman.1.0 fluffy\n" + ), + f.parent, + ) + ], + [], + None, + True, + ) + + +def _unittest_namespace_reader_read_definitions_target_in_lookup(temp_dsdl_factory) -> None: # type: ignore + """ + Ensure the direct and transitive sets are disjoint. + """ + from . import _dsdl_definition + + targets = [ + temp_dsdl_factory.new_file(Path("root", "ns", "Ontario.1.0.dsdl"), "@sealed\nns.NewBrunswick.1.0 place\n"), + temp_dsdl_factory.new_file(Path("root", "ns", "NewBrunswick.1.0.dsdl"), "@sealed"), + ] + lookup = [ + temp_dsdl_factory.new_file(Path("root", "ns", "NewBrunswick.1.0.dsdl"), "@sealed"), + ] + + definitions = read_definitions( + [_dsdl_definition.DSDLDefinition(t, t.parent) for t in targets], + [_dsdl_definition.DSDLDefinition(l, l.parent) for l in lookup], + None, + True, + ) + + assert len(definitions.direct) == 2 + assert len(definitions.transitive) == 0 + + +def _unittest_namespace_reader_read_defs_target_dont_allow_unregulated(temp_dsdl_factory) -> None: # type: ignore + """ + Ensure that an error is raised when an invalid, fixed port ID is used without an override. + """ + from pytest import raises as assert_raises + + from . import _dsdl_definition + from ._data_type_builder import UnregulatedFixedPortIDError + + targets = [ + temp_dsdl_factory.new_file(Path("root", "ns", "845.Lice.1.0.dsdl"), "@sealed\n"), + ] + + with assert_raises(UnregulatedFixedPortIDError): + read_definitions( + [_dsdl_definition.DSDLDefinition(t, t.parent) for t in targets], + [], + None, + False, + ) + + +def _unittest_namespace_reader_type_error() -> None: + from pytest import raises as assert_raises + + with assert_raises(TypeError): + read_definitions( + [""], # type: ignore + [], + None, + True, + ) diff --git a/pydsdl/_parser.py b/pydsdl/_parser.py index b73a533..cc25631 100644 --- a/pydsdl/_parser.py +++ b/pydsdl/_parser.py @@ -2,13 +2,14 @@ # This software is distributed under the terms of the MIT License. # Author: Pavel Kirienko +from __future__ import annotations import typing import logging import itertools import functools import fractions from pathlib import Path -from typing import List, Tuple +from typing import cast, Tuple import parsimonious from parsimonious.nodes import Node as _Node from . import _error @@ -160,6 +161,7 @@ def generic_visit(self, node: _Node, visited_children: typing.Sequence[typing.An return tuple(visited_children) or node def visit_line(self, node: _Node, children: _Children) -> None: + _ = children if len(node.text) == 0: # Line is empty, flush comment self._flush_comment() @@ -174,6 +176,7 @@ def visit_end_of_line(self, _n: _Node, _c: _Children) -> None: visit_statement_directive = _make_typesafe_child_lifter(type(None)) # nodes are above the top level. def visit_comment(self, node: _Node, children: _Children) -> None: + _ = children assert isinstance(node.text, str) self._comment += "\n" if self._comment != "" else "" self._comment += node.text[2:] if node.text.startswith("# ") else node.text[1:] @@ -263,11 +266,11 @@ def visit_type_version_specifier(self, _n: _Node, children: _Children) -> _seria return _serializable.Version(major=major.as_native_integer(), minor=minor.as_native_integer()) def visit_type_primitive_truncated(self, _n: _Node, children: _Children) -> _serializable.PrimitiveType: - _kw, _sp, cons = children # type: _Node, _Node, _PrimitiveTypeConstructor + _kw, _sp, cons = cast(Tuple[_Node, _Node, _PrimitiveTypeConstructor], children) return cons(_serializable.PrimitiveType.CastMode.TRUNCATED) def visit_type_primitive_saturated(self, _n: _Node, children: _Children) -> _serializable.PrimitiveType: - _, cons = children # type: _Node, _PrimitiveTypeConstructor + _, cons = cast(Tuple[_Node, _PrimitiveTypeConstructor], children) return cons(_serializable.PrimitiveType.CastMode.SATURATED) def visit_type_primitive_name_boolean(self, _n: _Node, _c: _Children) -> _PrimitiveTypeConstructor: @@ -302,7 +305,7 @@ def visit_type_bit_length_suffix(self, node: _Node, _c: _Children) -> int: visit_op2_exp = parsimonious.NodeVisitor.lift_child def visit_expression_list(self, _n: _Node, children: _Children) -> Tuple[_expression.Any, ...]: - out = [] # type: List[_expression.Any] + out = [] # type: list[_expression.Any] if children: children = children[0] assert len(children) == 2 diff --git a/pydsdl/_serializable/_composite.py b/pydsdl/_serializable/_composite.py index 6ed71da..1153cc7 100644 --- a/pydsdl/_serializable/_composite.py +++ b/pydsdl/_serializable/_composite.py @@ -5,17 +5,15 @@ import abc import math import typing -import itertools from pathlib import Path -from .. import _expression -from .. import _port_id_ranges + +from .. import _expression, _port_id_ranges from .._bit_length_set import BitLengthSet +from ._attribute import Attribute, Constant, Field, PaddingField +from ._name import InvalidNameError, check_name +from ._primitive import PrimitiveType, UnsignedIntegerType from ._serializable import SerializableType, TypeParameterError -from ._attribute import Attribute, Field, PaddingField, Constant -from ._name import check_name, InvalidNameError from ._void import VoidType -from ._primitive import PrimitiveType, UnsignedIntegerType - Version = typing.NamedTuple("Version", [("major", int), ("minor", int)]) @@ -55,7 +53,7 @@ class CompositeType(SerializableType): MAX_VERSION_NUMBER = 255 NAME_COMPONENT_SEPARATOR = "." - def __init__( # pylint: disable=too-many-arguments + def __init__( # pylint: disable=too-many-arguments, too-many-locals self, name: str, version: Version, @@ -97,9 +95,26 @@ def __init__( # pylint: disable=too-many-arguments "Name is too long: %r is longer than %d characters" % (self._name, self.MAX_NAME_LENGTH) ) - for component in self._name.split(self.NAME_COMPONENT_SEPARATOR): + self._name_components = self._name.split(self.NAME_COMPONENT_SEPARATOR) + for component in self._name_components: check_name(component) + def search_up_for_root(path: Path, namespace_components: typing.List[str]) -> Path: + if namespace_components[-1] != path.stem: + raise InvalidNameError( + f"{path.stem} != {namespace_components[-1]}. Source file directory structure " + f"is not consistent with the type's namespace ({self._name_components}, " + f"{self._source_file_path})" + ) + if len(namespace_components) == 1: + return path + return search_up_for_root(path.parent, namespace_components[:-1]) + + self._path_to_root_namespace = search_up_for_root( + self._source_file_path.parent, + (self.namespace_components if not self._has_parent_service else self.namespace_components[:-1]), + ) + # Version check version_valid = ( (0 <= self._version.major <= self.MAX_VERSION_NUMBER) @@ -148,7 +163,12 @@ def full_name(self) -> str: @property def name_components(self) -> typing.List[str]: """Components of the full name as a list, e.g., ``['uavcan', 'node', 'Heartbeat']``.""" - return self._name.split(CompositeType.NAME_COMPONENT_SEPARATOR) + return self._name_components + + @property + def namespace_components(self) -> typing.List[str]: + """Components of the namspace as a list, e.g., ``['uavcan', 'node']``.""" + return self._name_components[:-1] @property def short_name(self) -> str: @@ -163,7 +183,7 @@ def doc(self) -> str: @property def full_namespace(self) -> str: """The full name without the short name, e.g., ``uavcan.node`` for ``uavcan.node.Heartbeat``.""" - return str(CompositeType.NAME_COMPONENT_SEPARATOR.join(self.name_components[:-1])) + return str(CompositeType.NAME_COMPONENT_SEPARATOR.join(self.namespace_components)) @property def root_namespace(self) -> str: @@ -239,10 +259,30 @@ def has_fixed_port_id(self) -> bool: @property def source_file_path(self) -> Path: """ - For synthesized types such as service request/response sections, this property is defined as an empty string. + The path to the dsdl file from which this type was read. + For synthesized types such as service request/response sections, this property is the path to the service type + since request and response types are defined within the service type's dsdl file. """ return self._source_file_path + @property + def source_file_path_to_root(self) -> Path: + """ + The path to the folder that is the root namespace folder for the `source_file_path` this type was read from. + The `source_file_path` will always be relative to the `source_file_path_to_root` but not all types that share + the same `root_namespace` will have the same path to their root folder since types may be contributed to a + root namespace from several different file trees. For example: + + ``` + path0 = "workspace_0/project_a/types/animal/feline/Tabby.1.0.dsdl" + path1 = "workspace_1/project_b/types/animal/canine/Boxer.1.0.dsdl" + ``` + + In these examples path0 and path1 will produce composite types with `animal` as the root namespace but both + with have different `source_file_path_to_root` paths. + """ + return self._path_to_root_namespace + @property def alignment_requirement(self) -> int: # This is more general than required by the Specification, but it is done this way in case if we decided @@ -681,19 +721,25 @@ def iterate_fields_with_offsets( raise TypeError("Service types do not have serializable fields. Use either request or response.") +# +--[UNIT TESTS]-----------------------------------------------------------------------------------------------------+ + + def _unittest_composite_types() -> None: # pylint: disable=too-many-statements + from typing import Optional + from pytest import raises - from ._primitive import SignedIntegerType, FloatType + from ._array import FixedLengthArrayType, VariableLengthArrayType + from ._primitive import FloatType, SignedIntegerType - def try_name(name: str) -> CompositeType: + def try_name(name: str, file_path: Optional[Path] = None) -> CompositeType: return StructureType( name=name, version=Version(0, 1), attributes=[], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=file_path or Path(*name.split(".")), has_parent_service=False, ) @@ -721,6 +767,9 @@ def try_name(name: str) -> CompositeType: with raises(InvalidNameError, match="(?i).*cannot contain.*"): try_name("namespace.n-s.T") + with raises(InvalidNameError, match=".*Source file directory structure is not consistent.*"): + try_name("a.Foo", Path("foo/bar/b/Foo.0.1.dsdl")) + assert try_name("root.nested.T").full_name == "root.nested.T" assert try_name("root.nested.T").full_namespace == "root.nested" assert try_name("root.nested.T").root_namespace == "root" @@ -733,7 +782,7 @@ def try_name(name: str) -> CompositeType: attributes=[], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("a", "A"), has_parent_service=False, ) @@ -748,7 +797,7 @@ def try_name(name: str) -> CompositeType: ], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("a", "A"), has_parent_service=False, ) @@ -762,7 +811,7 @@ def try_name(name: str) -> CompositeType: ], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("uavcan", "node", "Heartbeat"), has_parent_service=False, ) assert u["a"].name == "a" @@ -788,7 +837,7 @@ def try_name(name: str) -> CompositeType: ], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("a", "A"), has_parent_service=False, ) assert s["a"].name == "a" @@ -847,7 +896,7 @@ def try_union_fields(field_types: typing.List[SerializableType]) -> UnionType: attributes=atr, deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("a") / "A", has_parent_service=False, ) @@ -891,7 +940,7 @@ def try_union_fields(field_types: typing.List[SerializableType]) -> UnionType: # The reference values for the following test are explained in the array tests above tu8 = UnsignedIntegerType(8, cast_mode=PrimitiveType.CastMode.TRUNCATED) small = VariableLengthArrayType(tu8, 2) - outer = FixedLengthArrayType(small, 2) # unpadded bit length values: {4, 12, 20, 28, 36} + outer = FixedLengthArrayType(small, 2) # un-padded bit length values: {4, 12, 20, 28, 36} # Above plus one bit to each, plus 16-bit for the unsigned integer field assert try_union_fields( @@ -912,7 +961,7 @@ def try_struct_fields(field_types: typing.List[SerializableType]) -> StructureTy attributes=atr, deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("a") / "A", has_parent_service=False, ) @@ -944,9 +993,12 @@ def try_struct_fields(field_types: typing.List[SerializableType]) -> StructureTy def _unittest_field_iterators() -> None: # pylint: disable=too-many-locals + import itertools + from pytest import raises - from ._primitive import BooleanType, FloatType + from ._array import FixedLengthArrayType, VariableLengthArrayType + from ._primitive import BooleanType, FloatType saturated = PrimitiveType.CastMode.SATURATED _seq_no = 0 @@ -960,7 +1012,7 @@ def make_type(meta: typing.Type[CompositeType], attributes: typing.Iterable[Attr attributes=attributes, deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("fake_root") / "ns" / f"Type{str(_seq_no)}", has_parent_service=False, ) @@ -1228,7 +1280,7 @@ def validate_iterator( attributes=[], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("ns", "S_1_0.dsdl"), has_parent_service=True, ), response=StructureType( @@ -1237,7 +1289,7 @@ def validate_iterator( attributes=[], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("ns", "S_1_0.dsdl"), has_parent_service=True, ), fixed_port_id=None, @@ -1251,7 +1303,7 @@ def validate_iterator( attributes=[], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("ns", "XX_1_0.dsdl"), has_parent_service=True, ), response=StructureType( @@ -1260,8 +1312,31 @@ def validate_iterator( attributes=[], deprecated=True, fixed_port_id=None, - source_file_path=Path(), - has_parent_service=False, + source_file_path=Path("ns", "XX_1_0.dsdl"), + has_parent_service=True, + ), + fixed_port_id=None, + ) + + with raises(ValueError): # Request/response consistency error (internal failure) + ServiceType( + request=StructureType( + name="ns.XX.Request", + version=Version(1, 0), + attributes=[], + deprecated=False, + fixed_port_id=None, + source_file_path=Path("ns", "XX_1_0.dsdl"), + has_parent_service=True, + ), + response=StructureType( + name="ns.XX.Response", + version=Version(1, 0), + attributes=[], + deprecated=False, + fixed_port_id=None, + source_file_path=Path("ns", "YY_1_0.dsdl"), + has_parent_service=True, ), fixed_port_id=None, ) @@ -1273,7 +1348,7 @@ def validate_iterator( attributes=[], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("e", "E_0_1.dsdl"), has_parent_service=False, ) validate_iterator(e, []) diff --git a/pydsdl/_test.py b/pydsdl/_test.py index 3e4048f..d3dbecb 100644 --- a/pydsdl/_test.py +++ b/pydsdl/_test.py @@ -2,10 +2,12 @@ # This software is distributed under the terms of the MIT License. # Author: Pavel Kirienko +# cSpell: words iceb # pylint: disable=global-statement,protected-access,too-many-statements,consider-using-with,redefined-outer-name +from __future__ import annotations import tempfile -from typing import Union, Tuple, Optional, Sequence, Type, Iterable +from typing import Sequence, Type, Iterable from pathlib import Path from textwrap import dedent import pytest # This is only safe to import in test files! @@ -28,7 +30,7 @@ def __init__(self) -> None: def directory(self) -> Path: return Path(self._tmp_dir.name) - def new(self, rel_path: Union[str, Path], text: str) -> None: + def new(self, rel_path: str | Path, text: str) -> None: """ Simply creates a new DSDL source file with the given contents at the specified path inside the workspace. """ @@ -37,7 +39,7 @@ def new(self, rel_path: Union[str, Path], text: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(text, encoding="utf8") - def parse_new(self, rel_path: Union[str, Path], text: str) -> _dsdl_definition.DSDLDefinition: + def parse_new(self, rel_path: str | Path, text: str) -> _dsdl_definition.DSDLDefinition: """ Creates a new DSDL source file with the given contents at the specified path inside the workspace, then parses it and returns the resulting definition object. @@ -62,6 +64,7 @@ def parse_definition( ) -> _serializable.CompositeType: return definition.read( lookup_definitions, + [], print_output_handler=lambda line, text: print("Output from line %d:" % line, text), allow_unregulated_fixed_port_id=False, ) @@ -314,7 +317,7 @@ def _unittest_comments(wrkspc: Workspace) -> None: uint8 CHARACTER = '#' # comment on constant int8 a # comment on field - int8 aprime + int8 a_prime @assert 1 == 1 # toss one in for confusion void2 # comment on padding field saturated int64[<33] b @@ -422,7 +425,7 @@ def _unittest_error(wrkspc: Workspace) -> None: def standalone(rel_path: str, definition: str, allow_unregulated: bool = False) -> _serializable.CompositeType: return wrkspc.parse_new(rel_path, definition + "\n").read( - [], lambda *_: None, allow_unregulated + [], [], lambda *_: None, allow_unregulated ) # pragma: no branch with raises(_error.InvalidDefinitionError, match="(?i).*port ID.*"): @@ -745,7 +748,7 @@ def standalone(rel_path: str, definition: str, allow_unregulated: bool = False) def _unittest_print(wrkspc: Workspace) -> None: - printed_items = None # type: Optional[Tuple[int, str]] + printed_items = None # type: tuple[int, str] | None def print_handler(line_number: int, text: str) -> None: nonlocal printed_items @@ -754,20 +757,20 @@ def print_handler(line_number: int, text: str) -> None: wrkspc.parse_new( "ns/A.1.0.dsdl", "# line number 1\n" "# line number 2\n" "@print 2 + 2 == 4 # line number 3\n" "# line number 4\n" "@sealed\n", - ).read([], print_handler, False) + ).read([], [], print_handler, False) assert printed_items assert printed_items[0] == 3 assert printed_items[1] == "true" - wrkspc.parse_new("ns/B.1.0.dsdl", "@print false\n@sealed").read([], print_handler, False) + wrkspc.parse_new("ns/B.1.0.dsdl", "@print false\n@sealed").read([], [], print_handler, False) assert printed_items assert printed_items[0] == 1 assert printed_items[1] == "false" wrkspc.parse_new( "ns/Offset.1.0.dsdl", "@print _offset_ # Not recorded\n" "uint8 a\n" "@print _offset_\n" "@extent 800\n" - ).read([], print_handler, False) + ).read([], [], print_handler, False) assert printed_items assert printed_items[0] == 3 assert printed_items[1] == "{8}" @@ -989,7 +992,7 @@ def _unittest_assert(wrkspc: Workspace) -> None: def _unittest_parse_namespace(wrkspc: Workspace) -> None: from pytest import raises - print_output = None # type: Optional[Tuple[str, int, str]] + print_output = None # type: tuple[str, int, str] | None def print_handler(d: Path, line: int, text: str) -> None: nonlocal print_output