diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..d939d93 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "test/public_regulated_data_types"] + path = test/public_regulated_data_types + url = https://github.com/OpenCyphal/public_regulated_data_types.git diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..644dbd0 --- /dev/null +++ b/conftest.py @@ -0,0 +1,71 @@ +# +# Copyright (C) OpenCyphal Development Team +# Copyright Amazon.com Inc. or its affiliates. +# SPDX-License-Identifier: MIT +# +""" +Configuration for pytest tests including fixtures and hooks. +""" + +import tempfile +from pathlib import Path +from typing import Any, Optional + +import pytest + + +# +-------------------------------------------------------------------------------------------------------------------+ +# | TEST FIXTURES +# +-------------------------------------------------------------------------------------------------------------------+ +class TemporaryDsdlContext: + """ + Powers the temp_dsdl_factory test fixture. + """ + def __init__(self) -> None: + self._base_dir: Optional[Any] = None + + def new_file(self, file_path: Path, text: str | None = None) -> Path: + if file_path.is_absolute(): + raise ValueError(f"{file_path} is an absolute path. The test fixture requires relative paths to work.") + file = self.base_dir / file_path + file.parent.mkdir(parents=True, exist_ok=True) + if text is not None: + file.write_text(text) + return file + + @property + def base_dir(self) -> Path: + if self._base_dir is None: + self._base_dir = tempfile.TemporaryDirectory() + return Path(self._base_dir.name).resolve() + + def _test_path_finalizer(self) -> None: + """ + Finalizer to clean up any temporary directories created during the test. + """ + if self._base_dir is not None: + self._base_dir.cleanup() + del self._base_dir + self._base_dir = None + +@pytest.fixture(scope="function") +def temp_dsdl_factory(request: pytest.FixtureRequest) -> Any: # pylint: disable=unused-argument + """ + Fixture for pydsdl tests that have to create files as part of the test. This object stays in-scope for a given + test method and does not requires a context manager in the test itself. + + Call `new_file(path)` to create a new file path in the fixture's temporary directory. This will create all + uncreated parent directories but will _not_ create the file unless text is provided: `new_file(path, "hello")` + """ + f = TemporaryDsdlContext() + request.addfinalizer(f._test_path_finalizer) # pylint: disable=protected-access + return f + + + +@pytest.fixture +def public_types() -> Path: + """ + Path to the public regulated data types directory used for tests. + """ + return Path("test") / "public_regulated_data_types" / "uavcan" diff --git a/pydsdl/__init__.py b/pydsdl/__init__.py index d07f1f2..24ba9d4 100644 --- a/pydsdl/__init__.py +++ b/pydsdl/__init__.py @@ -25,8 +25,9 @@ _sys.path = [str(_Path(__file__).parent / "third_party")] + _sys.path # Never import anything that is not available here - API stability guarantees are only provided for the exposed items. +from ._dsdl import PrintOutputHandler as PrintOutputHandler from ._namespace import read_namespace as read_namespace -from ._namespace import PrintOutputHandler as PrintOutputHandler +from ._namespace import read_files as read_files # Error model. from ._error import FrontendError as FrontendError diff --git a/pydsdl/_data_type_builder.py b/pydsdl/_data_type_builder.py index 4572da3..49ee3ed 100644 --- a/pydsdl/_data_type_builder.py +++ b/pydsdl/_data_type_builder.py @@ -2,16 +2,12 @@ # This software is distributed under the terms of the MIT License. # Author: Pavel Kirienko -from typing import Optional, Callable, Iterable import logging from pathlib import Path -from . import _serializable -from . import _expression -from . import _error -from . import _dsdl_definition -from . import _parser -from . import _data_schema_builder -from . import _port_id_ranges +from typing import Callable, Iterable, Optional + +from . import _data_schema_builder, _error, _expression, _parser, _port_id_ranges, _serializable +from ._dsdl import DefinitionVisitor, DsdlFileBuildable class AssertionCheckFailureError(_error.InvalidDefinitionError): @@ -42,21 +38,25 @@ class MissingSerializationModeError(_error.InvalidDefinitionError): class DataTypeBuilder(_parser.StatementStreamProcessor): + + # pylint: disable=too-many-arguments def __init__( self, - definition: _dsdl_definition.DSDLDefinition, - lookup_definitions: Iterable[_dsdl_definition.DSDLDefinition], + definition: DsdlFileBuildable, + lookup_definitions: Iterable[DsdlFileBuildable], + definition_visitors: Iterable[DefinitionVisitor], print_output_handler: Callable[[int, str], None], allow_unregulated_fixed_port_id: bool, ): self._definition = definition self._lookup_definitions = list(lookup_definitions) + self._definition_visitors = definition_visitors self._print_output_handler = print_output_handler self._allow_unregulated_fixed_port_id = allow_unregulated_fixed_port_id self._element_callback = None # type: Optional[Callable[[str], None]] - assert isinstance(self._definition, _dsdl_definition.DSDLDefinition) - assert all(map(lambda x: isinstance(x, _dsdl_definition.DSDLDefinition), lookup_definitions)) + assert isinstance(self._definition, DsdlFileBuildable) + assert all(map(lambda x: isinstance(x, DsdlFileBuildable), lookup_definitions)) assert callable(self._print_output_handler) assert isinstance(self._allow_unregulated_fixed_port_id, bool) @@ -198,6 +198,7 @@ def resolve_versioned_data_type(self, name: str, version: _serializable.Version) del name found = list(filter(lambda d: d.full_name == full_name and d.version == version, self._lookup_definitions)) if not found: + # Play Sherlock to help the user with mistakes like https://forum.opencyphal.org/t/904/2 requested_ns = full_name.split(_serializable.CompositeType.NAME_COMPONENT_SEPARATOR)[0] lookup_nss = set(x.root_namespace for x in self._lookup_definitions) @@ -221,15 +222,20 @@ def resolve_versioned_data_type(self, name: str, version: _serializable.Version) raise _error.InternalError("Conflicting definitions: %r" % found) target_definition = found[0] - assert isinstance(target_definition, _dsdl_definition.DSDLDefinition) + for visitor in self._definition_visitors: + visitor(self._definition, target_definition) + + assert isinstance(target_definition, DsdlFileBuildable) assert target_definition.full_name == full_name assert target_definition.version == version # Recursion is cool. - return target_definition.read( + dt = target_definition.read( lookup_definitions=self._lookup_definitions, + definition_visitors=self._definition_visitors, print_output_handler=self._print_output_handler, allow_unregulated_fixed_port_id=self._allow_unregulated_fixed_port_id, ) + return dt def _queue_attribute(self, element_callback: Callable[[str], None]) -> None: self._flush_attribute("") @@ -266,7 +272,9 @@ def _on_assert_directive(self, line_number: int, value: Optional[_expression.Any elif value is None: raise InvalidDirectiveError("Assert directive requires an expression") else: - raise InvalidDirectiveError("The assertion check expression must yield a boolean, not %s" % value.TYPE_NAME) + raise InvalidDirectiveError( + "The assertion check expression must yield a boolean, not %s" % value.TYPE_NAME + ) def _on_extent_directive(self, line_number: int, value: Optional[_expression.Any]) -> None: if self._structs[-1].serialization_mode is not None: @@ -300,7 +308,9 @@ def _on_union_directive(self, _ln: int, value: Optional[_expression.Any]) -> Non if self._structs[-1].union: raise InvalidDirectiveError("Duplicated union directive") if self._structs[-1].attributes: - raise InvalidDirectiveError("The union directive must be placed before the first " "attribute definition") + raise InvalidDirectiveError( + "The union directive must be placed before the first " "attribute definition" + ) self._structs[-1].make_union() def _on_deprecated_directive(self, _ln: int, value: Optional[_expression.Any]) -> None: diff --git a/pydsdl/_dsdl.py b/pydsdl/_dsdl.py new file mode 100644 index 0000000..82723d7 --- /dev/null +++ b/pydsdl/_dsdl.py @@ -0,0 +1,268 @@ +# Copyright (C) OpenCyphal Development Team +# Copyright Amazon.com Inc. or its affiliates. +# SPDX-License-Identifier: MIT + +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Any, Callable, Iterable, List, Optional, Set, Tuple, TypeVar, Union + +from ._serializable import CompositeType, Version + +PrintOutputHandler = Callable[[Path, int, str], None] +"""Invoked when the frontend encounters a print directive or needs to output a generic diagnostic.""" + + +class DsdlFile(ABC): + """ + Interface for DSDL files. This interface is used by the parser to abstract DSDL type details inferred from the + filesystem. Where properties are duplicated between the composite type and this file the composite type is to be + considered canonical. The properties directly on this class are inferred from the dsdl file path before the + composite type has been parsed. + """ + + @property + @abstractmethod + def composite_type(self) -> Optional[CompositeType]: + """The composite type that was read from the DSDL file or None if the type has not been parsed yet.""" + raise NotImplementedError() + + @property + @abstractmethod + def full_name(self) -> str: + """The full name, e.g., uavcan.node.Heartbeat""" + raise NotImplementedError() + + @property + def name_components(self) -> List[str]: + """Components of the full name as a list, e.g., ['uavcan', 'node', 'Heartbeat']""" + raise NotImplementedError() + + @property + @abstractmethod + def short_name(self) -> str: + """The last component of the full name, e.g., Heartbeat of uavcan.node.Heartbeat""" + raise NotImplementedError() + + @property + @abstractmethod + def full_namespace(self) -> str: + """The full name without the short name, e.g., uavcan.node for uavcan.node.Heartbeat""" + raise NotImplementedError() + + @property + @abstractmethod + def root_namespace(self) -> str: + """The first component of the full name, e.g., uavcan of uavcan.node.Heartbeat""" + raise NotImplementedError() + + @property + @abstractmethod + def text(self) -> str: + """The source text in its raw unprocessed form (with comments, formatting intact, and everything)""" + raise NotImplementedError() + + @property + @abstractmethod + def version(self) -> Version: + """ + The version of the DSDL definition. + """ + raise NotImplementedError() + + @property + @abstractmethod + def fixed_port_id(self) -> Optional[int]: + """Either the fixed port ID as integer, or None if not defined for this type.""" + raise NotImplementedError() + + @property + @abstractmethod + def has_fixed_port_id(self) -> bool: + """ + If the type has a fixed port ID defined, this method returns True. Equivalent to ``fixed_port_id is not None``. + """ + raise NotImplementedError() + + @property + @abstractmethod + def file_path(self) -> Path: + """The path to the DSDL file on the filesystem.""" + raise NotImplementedError() + + @property + @abstractmethod + def root_namespace_path(self) -> Path: + """ + The path to the root namespace directory on the filesystem. + """ + raise NotImplementedError() + + @abstractmethod + def get_composite_type(self) -> CompositeType: + """ + Returns the composite type of the DSDL file or raises an exception if the type has not been parsed yet. + :return: The composite type. + :raises InvalidDefinitionError: If the type has not been parsed yet. + """ + raise NotImplementedError() + + +class DsdlFileBuildable(DsdlFile): + """ + A DSDL file that can construct a composite type from its contents. + """ + + @abstractmethod + def read( + self, + lookup_definitions: Iterable["DsdlFileBuildable"], + definition_visitors: Iterable["DefinitionVisitor"], + print_output_handler: Callable[[int, str], None], + allow_unregulated_fixed_port_id: bool, + ) -> CompositeType: + """ + Reads the data type definition and returns its high-level data type representation. + The output should be cached; all following invocations should read from this cache. + Caching is very important, because it is expected that the same definition may be referred to multiple + times (e.g., for composition or when accessing external constants). Re-processing a definition every time + it is accessed would be a huge waste of time. + Note, however, that this may lead to unexpected complications if one is attempting to re-read a definition + with different inputs (e.g., different lookup paths) expecting to get a different result: caching would + get in the way. That issue is easy to avoid by creating a new instance of the object. + :param lookup_definitions: List of definitions available for referring to. + :param definition_visitors: Visitors to notify about discovered dependencies. + :param print_output_handler: Used for @print and for diagnostics: (line_number, text) -> None. + :param allow_unregulated_fixed_port_id: Do not complain about fixed unregulated port IDs. + :return: The data type representation. + """ + raise NotImplementedError() + + +DefinitionVisitor = Callable[[DsdlFile, DsdlFileBuildable], None] +""" +Called by the parser after if finds a dependent type but before it parses a file in a lookup namespace. +:param DsdlFile argument 0: The target DSDL file that has dependencies the parser is searching for. +:param DsdlFile argument 1: The dependency of target_dsdl_file file the parser is about to parse. +""" + +SortedFileT = TypeVar("SortedFileT", DsdlFile, DsdlFileBuildable, CompositeType) +SortedFileList = List[SortedFileT] +"""A list of DSDL files sorted by name, newest version first.""" + +FileSortKey: Callable[[SortedFileT], Tuple[str, int, int]] = lambda d: ( + d.full_name, + -d.version.major, + -d.version.minor, +) + + +def file_sort(file_list: Iterable[SortedFileT]) -> SortedFileList: + """ + Sorts a list of DSDL files lexicographically by name, newest version first. + """ + return list(sorted(file_list, key=FileSortKey)) + + +def normalize_paths_argument_to_list( + namespaces_or_namespace: Union[None, Path, str, Iterable[Union[Path, str]]], +) -> List[Path]: + """ + Normalizes the input argument to a list of paths. + """ + if namespaces_or_namespace is None: + return [] + if isinstance(namespaces_or_namespace, (Path, str)): + return [Path(namespaces_or_namespace)] + + def _convert(arg: Any) -> Path: + if not isinstance(arg, (str, Path)): + raise TypeError(f"Invalid type: {type(arg)}") + return Path(arg) if isinstance(arg, str) else arg + + return [_convert(arg) for arg in namespaces_or_namespace] + + +def normalize_paths_argument_to_set( + namespaces_or_namespace: Union[None, Path, str, Iterable[Union[Path, str]]], +) -> Set[Path]: + """ + Normalizes the input argument to a set of paths. + """ + return set(normalize_paths_argument_to_list(namespaces_or_namespace)) + + +# +-[UNIT TESTS]------------------------------------------------------------------------------------------------------+ + + +def _unittest_dsdl_normalize_paths_argument_to_list() -> None: + + from pytest import raises as assert_raises + + # Test with None argument + result = normalize_paths_argument_to_list(None) + assert result == [] + + # Test with single string argument + result = normalize_paths_argument_to_list("path/to/namespace") + assert result == [Path("path/to/namespace")] + + # Test with single Path argument + result = normalize_paths_argument_to_list(Path("path/to/namespace")) + assert result == [Path("path/to/namespace")] + + # Test with list of strings argument + result = normalize_paths_argument_to_list(["path/to/namespace1", "path/to/namespace2"]) + assert result == [Path("path/to/namespace1"), Path("path/to/namespace2")] + + # Test with list of Path arguments + result = normalize_paths_argument_to_list([Path("path/to/namespace1"), Path("path/to/namespace2")]) + assert result == [Path("path/to/namespace1"), Path("path/to/namespace2")] + + # Test with mixed list of strings and Path arguments + result = normalize_paths_argument_to_list(["path/to/namespace1", Path("path/to/namespace2")]) + assert result == [Path("path/to/namespace1"), Path("path/to/namespace2")] + + # Test with invalid argument type + with assert_raises(TypeError): + normalize_paths_argument_to_list(42) # type: ignore + + # Test with invalid argument type + with assert_raises(TypeError): + normalize_paths_argument_to_list([42]) # type: ignore + + +def _unittest_dsdl_normalize_paths_argument_to_set() -> None: + + from pytest import raises as assert_raises + + # Test with None argument + result = normalize_paths_argument_to_set(None) + assert result == set() + + # Test with single string argument + result = normalize_paths_argument_to_set("path/to/namespace") + assert result == {Path("path/to/namespace")} + + # Test with single Path argument + result = normalize_paths_argument_to_set(Path("path/to/namespace")) + assert result == {Path("path/to/namespace")} + + # Test with list of strings argument + result = normalize_paths_argument_to_set(["path/to/namespace1", "path/to/namespace2"]) + assert result == {Path("path/to/namespace1"), Path("path/to/namespace2")} + + # Test with list of Path arguments + result = normalize_paths_argument_to_set([Path("path/to/namespace1"), Path("path/to/namespace2")]) + assert result == {Path("path/to/namespace1"), Path("path/to/namespace2")} + + # Test with mixed list of strings and Path arguments + result = normalize_paths_argument_to_set(["path/to/namespace1", Path("path/to/namespace2")]) + assert result == {Path("path/to/namespace1"), Path("path/to/namespace2")} + + # Test with invalid argument type + with assert_raises(TypeError): + normalize_paths_argument_to_set(42) # type: ignore + + # Test with invalid argument type + with assert_raises(TypeError): + normalize_paths_argument_to_set([42]) # type: ignore diff --git a/pydsdl/_dsdl_definition.py b/pydsdl/_dsdl_definition.py index a8114da..41df769 100644 --- a/pydsdl/_dsdl_definition.py +++ b/pydsdl/_dsdl_definition.py @@ -2,14 +2,16 @@ # This software is distributed under the terms of the MIT License. # Author: Pavel Kirienko -import time -from typing import Iterable, Callable, Optional, List import logging +import time from pathlib import Path -from ._error import FrontendError, InvalidDefinitionError, InternalError -from ._serializable import CompositeType, Version -from . import _parser +from typing import Callable, Iterable, List, Optional +from . import _parser +from ._data_type_builder import DataTypeBuilder +from ._dsdl import DefinitionVisitor, DsdlFileBuildable +from ._error import FrontendError, InternalError, InvalidDefinitionError +from ._serializable import CompositeType, Version _logger = logging.getLogger(__name__) @@ -23,7 +25,7 @@ def __init__(self, text: str, path: Path): super().__init__(text=text, path=Path(path)) -class DSDLDefinition: +class DSDLDefinition(DsdlFileBuildable): """ A DSDL type definition source abstracts the filesystem level details away, presenting a higher-level interface that operates solely on the level of type names, namespaces, fixed identifiers, and so on. @@ -36,15 +38,18 @@ def __init__(self, file_path: Path, root_namespace_path: Path): del file_path self._root_namespace_path = Path(root_namespace_path) del root_namespace_path - with open(self._file_path) as f: - self._text = str(f.read()) + self._text: Optional[str] = None # Checking the sanity of the root directory path - can't contain separators if CompositeType.NAME_COMPONENT_SEPARATOR in self._root_namespace_path.name: raise FileNameFormatError("Invalid namespace name", path=self._root_namespace_path) # Determining the relative path within the root namespace directory - relative_path = self._root_namespace_path.name / self._file_path.relative_to(self._root_namespace_path) + try: + relative_path = self._root_namespace_path.name / self._file_path.relative_to(self._root_namespace_path) + except ValueError: + # the file is not under the same root path so we'll have to make an assumption that the + relative_path = Path(self._root_namespace_path.name) / self._file_path.name # Parsing the basename, e.g., 434.GetTransportStatistics.0.1.dsdl basename_components = relative_path.name.split(".")[:-1] @@ -86,31 +91,24 @@ def __init__(self, file_path: Path, root_namespace_path: Path): self._cached_type: Optional[CompositeType] = None + # +-----------------------------------------------------------------------+ + # | DsdlFileBuildable :: INTERFACE | + # +-----------------------------------------------------------------------+ def read( self, - lookup_definitions: Iterable["DSDLDefinition"], + lookup_definitions: Iterable[DsdlFileBuildable], + definition_visitors: Iterable[DefinitionVisitor], print_output_handler: Callable[[int, str], None], allow_unregulated_fixed_port_id: bool, ) -> CompositeType: - """ - Reads the data type definition and returns its high-level data type representation. - The output is cached; all following invocations will read from the cache. - Caching is very important, because it is expected that the same definition may be referred to multiple - times (e.g., for composition or when accessing external constants). Re-processing a definition every time - it is accessed would be a huge waste of time. - Note, however, that this may lead to unexpected complications if one is attempting to re-read a definition - with different inputs (e.g., different lookup paths) expecting to get a different result: caching would - get in the way. That issue is easy to avoid by creating a new instance of the object. - :param lookup_definitions: List of definitions available for referring to. - :param print_output_handler: Used for @print and for diagnostics: (line_number, text) -> None. - :param allow_unregulated_fixed_port_id: Do not complain about fixed unregulated port IDs. - :return: The data type representation. - """ log_prefix = "%s.%d.%d" % (self.full_name, self.version.major, self.version.minor) if self._cached_type is not None: _logger.debug("%s: Cache hit", log_prefix) return self._cached_type + if not self._file_path.exists(): + raise InvalidDefinitionError("Attempt to read DSDL file that doesn't exist.", self._file_path) + started_at = time.monotonic() # Remove the target definition from the lookup list in order to prevent @@ -124,17 +122,17 @@ def read( ", ".join(set(sorted(map(lambda x: x.root_namespace, lookup_definitions)))), ) try: - builder = _data_type_builder.DataTypeBuilder( + builder = DataTypeBuilder( definition=self, lookup_definitions=lookup_definitions, + definition_visitors=definition_visitors, print_output_handler=print_output_handler, allow_unregulated_fixed_port_id=allow_unregulated_fixed_port_id, ) - with open(self.file_path) as f: - _parser.parse(f.read(), builder) - self._cached_type = builder.finalize() + _parser.parse(self.text, builder) + self._cached_type = builder.finalize() _logger.info( "%s: Processed in %.0f ms; category: %s, fixed port ID: %s", log_prefix, @@ -151,34 +149,38 @@ def read( except Exception as ex: # pragma: no cover raise InternalError(culprit=ex, path=self.file_path) from ex + # +-----------------------------------------------------------------------+ + # | DsdlFile :: INTERFACE | + # +-----------------------------------------------------------------------+ + @property + def composite_type(self) -> Optional[CompositeType]: + return self._cached_type + @property def full_name(self) -> str: - """The full name, e.g., uavcan.node.Heartbeat""" return self._name @property def name_components(self) -> List[str]: - """Components of the full name as a list, e.g., ['uavcan', 'node', 'Heartbeat']""" return self._name.split(CompositeType.NAME_COMPONENT_SEPARATOR) @property def short_name(self) -> str: - """The last component of the full name, e.g., Heartbeat of uavcan.node.Heartbeat""" return self.name_components[-1] @property def full_namespace(self) -> str: - """The full name without the short name, e.g., uavcan.node for uavcan.node.Heartbeat""" return str(CompositeType.NAME_COMPONENT_SEPARATOR.join(self.name_components[:-1])) @property def root_namespace(self) -> str: - """The first component of the full name, e.g., uavcan of uavcan.node.Heartbeat""" return self.name_components[0] @property def text(self) -> str: - """The source text in its raw unprocessed form (with comments, formatting intact, and everything)""" + if self._text is None: + with open(self._file_path) as f: + self._text = str(f.read()) return self._text @property @@ -187,7 +189,6 @@ def version(self) -> Version: @property def fixed_port_id(self) -> Optional[int]: - """Either the fixed port ID as integer, or None if not defined for this type.""" return self._fixed_port_id @property @@ -202,6 +203,17 @@ def file_path(self) -> Path: def root_namespace_path(self) -> Path: return self._root_namespace_path + def get_composite_type(self) -> CompositeType: + if self._cached_type is None: + raise InvalidDefinitionError("The definition has not been read yet", self.file_path) + return self._cached_type + + # +-----------------------------------------------------------------------+ + # | Python :: SPECIAL FUNCTIONS | + # +-----------------------------------------------------------------------+ + def __hash__(self) -> int: + return hash((self.full_name, self.version)) + def __eq__(self, other: object) -> bool: """ Two definitions will compare equal if they share the same name AND version number. @@ -222,6 +234,25 @@ def __str__(self) -> str: __repr__ = __str__ -# Moved this import here to break recursive dependency. -# Maybe I have messed up the architecture? Should think about it later. -from . import _data_type_builder # pylint: disable=wrong-import-position +# +-[UNIT TESTS]------------------------------------------------------------------------------------------------------+ + + +def _unittest_dsdl_definition_read_non_existant() -> None: + from pytest import raises as expect_raises + + target = Path("root", "ns", "Target.1.1.dsdl") + target_definition = DSDLDefinition(target, target.parent) + + def print_output(line_number: int, text: str) -> None: + pass + + with expect_raises(InvalidDefinitionError): + target_definition.read([], [], print_output, True) + + +def _unittest_dsdl_definition_read_text(temp_dsdl_factory) -> None: # type: ignore + target_root = Path("root", "ns") + target_file_path = Path(target_root / "Target.1.1.dsdl") + dsdl_file = temp_dsdl_factory.new_file(target_root / target_file_path, "@sealed") + target_definition = DSDLDefinition(dsdl_file, target_root) + assert "@sealed" == target_definition.text diff --git a/pydsdl/_error.py b/pydsdl/_error.py index d301765..9222621 100644 --- a/pydsdl/_error.py +++ b/pydsdl/_error.py @@ -108,6 +108,9 @@ class InvalidDefinitionError(FrontendError): """ +# +-[UNIT TESTS]------------------------------------------------------------------------------------------------------+ + + def _unittest_error() -> None: try: raise FrontendError("Hello world!") @@ -124,8 +127,8 @@ def _unittest_error() -> None: try: raise FrontendError("Hello world!", path=Path("path/to/file.dsdl")) except Exception as ex: - assert str(ex) == "path/to/file.dsdl: Hello world!" assert repr(ex) == "FrontendError: 'path/to/file.dsdl: Hello world!'" + assert str(ex) == "path/to/file.dsdl: Hello world!" def _unittest_internal_error_github_reporting() -> None: diff --git a/pydsdl/_namespace.py b/pydsdl/_namespace.py index 8e9501e..c26ce8e 100644 --- a/pydsdl/_namespace.py +++ b/pydsdl/_namespace.py @@ -4,13 +4,19 @@ # pylint: disable=logging-not-lazy -from typing import Iterable, Callable, DefaultDict, List, Optional, Union, Set, Dict -import logging import collections +import logging from pathlib import Path -from . import _serializable -from . import _dsdl_definition -from . import _error +from typing import Callable, DefaultDict, Dict, Iterable, List, Optional, Set, Tuple, Union, cast + +from . import _dsdl_definition, _error, _serializable +from ._dsdl import DsdlFileBuildable, PrintOutputHandler, SortedFileList +from ._dsdl import file_sort as dsdl_file_sort +from ._dsdl import normalize_paths_argument_to_list, normalize_paths_argument_to_set +from ._namespace_reader import read_definitions, DsdlDefinitions + + +_logger = logging.getLogger(__name__) class RootNamespaceNameCollisionError(_error.InvalidDefinitionError): @@ -69,8 +75,13 @@ class SealingConsistencyError(_error.InvalidDefinitionError): """ -PrintOutputHandler = Callable[[Path, int, str], None] -"""Invoked when the frontend encounters a print directive or needs to output a generic diagnostic.""" +class DsdlPathInferenceError(_error.InvalidDefinitionError): + """ + Raised when the namespace, type, fixed port ID, or version cannot be inferred from a file path. + """ + + +# +--[PUBLIC API]-----------------------------------------------------------------------------------------------------+ def read_namespace( @@ -81,7 +92,7 @@ def read_namespace( allow_root_namespace_name_collision: bool = True, ) -> List[_serializable.CompositeType]: """ - This function is the main entry point of the library. + This function is a main entry point for the library. It reads all DSDL definitions from the specified root namespace directory and produces the annotated AST. :param root_namespace_directory: The path of the root namespace directory that will be read. @@ -108,48 +119,26 @@ def read_namespace( the same root namespace name multiple times in the lookup dirs. This will enable defining a namespace partially and let other entities define new messages or new sub-namespaces in the same root namespace. - :return: A list of :class:`pydsdl.CompositeType` sorted lexicographically by full data type name, - then by major version (newest version first), then by minor version (newest version first). - The ordering guarantee allows the caller to always find the newest version simply by picking - the first matching occurrence. + :return: A list of :class:`pydsdl.CompositeType` found under the `root_namespace_directory` and sorted + lexicographically by full data type name, then by major version (newest version first), then by minor + version (newest version first). The ordering guarantee allows the caller to always find the newest version + simply by picking the first matching occurrence. :raises: :class:`pydsdl.FrontendError`, :class:`MemoryError`, :class:`SystemError`, :class:`OSError` if directories do not exist or inaccessible, :class:`ValueError`/:class:`TypeError` if the arguments are invalid. """ - # Add the own root namespace to the set of lookup directories, sort lexicographically, remove duplicates. - # We'd like this to be an iterable list of strings but we handle the common practice of passing in a single path. - if lookup_directories is None: - lookup_directories_path_list: List[Path] = [] - elif isinstance(lookup_directories, (str, bytes, Path)): - lookup_directories_path_list = [Path(lookup_directories)] - else: - lookup_directories_path_list = list(map(Path, lookup_directories)) - - for a in lookup_directories_path_list: - if not isinstance(a, (str, Path)): - raise TypeError("Lookup directories shall be an iterable of paths. Found in list: " + type(a).__name__) - _logger.debug(_LOG_LIST_ITEM_PREFIX + str(a)) - # Normalize paths and remove duplicates. Resolve symlinks to avoid ambiguities. root_namespace_directory = Path(root_namespace_directory).resolve() - lookup_directories_path_list.append(root_namespace_directory) - lookup_directories_path_list = list(sorted({x.resolve() for x in lookup_directories_path_list})) - _logger.debug("Lookup directories are listed below:") - for a in lookup_directories_path_list: - _logger.debug(_LOG_LIST_ITEM_PREFIX + str(a)) - # Check for common usage errors and warn the user if anything looks suspicious. - _ensure_no_common_usage_errors(root_namespace_directory, lookup_directories_path_list, _logger.warning) - - # Check the namespaces. - _ensure_no_nested_root_namespaces(lookup_directories_path_list) - - if not allow_root_namespace_name_collision: - _ensure_no_namespace_name_collisions(lookup_directories_path_list) + lookup_directories_path_list = _construct_lookup_directories_path_list( + [root_namespace_directory], + normalize_paths_argument_to_list(lookup_directories), + allow_root_namespace_name_collision, + ) # Construct DSDL definitions from the target and the lookup dirs. - target_dsdl_definitions = _construct_dsdl_definitions_from_namespace(root_namespace_directory) + target_dsdl_definitions = _construct_dsdl_definitions_from_namespaces([root_namespace_directory]) if not target_dsdl_definitions: _logger.info("The namespace at %s is empty", root_namespace_directory) return [] @@ -157,9 +146,120 @@ def read_namespace( for x in target_dsdl_definitions: _logger.debug(_LOG_LIST_ITEM_PREFIX + str(x)) - lookup_dsdl_definitions = [] # type: List[_dsdl_definition.DSDLDefinition] - for ld in lookup_directories_path_list: - lookup_dsdl_definitions += _construct_dsdl_definitions_from_namespace(ld) + return _complete_read_function( + target_dsdl_definitions, lookup_directories_path_list, print_output_handler, allow_unregulated_fixed_port_id + ).direct + + +# pylint: disable=too-many-arguments +def read_files( + dsdl_files: Union[None, Path, str, Iterable[Union[Path, str]]], + root_namespace_directories_or_names: Union[None, Path, str, Iterable[Union[Path, str]]], + lookup_directories: Union[None, Path, str, Iterable[Union[Path, str]]] = None, + print_output_handler: Optional[PrintOutputHandler] = None, + allow_unregulated_fixed_port_id: bool = False, +) -> Tuple[List[_serializable.CompositeType], List[_serializable.CompositeType]]: + """ + This function is a main entry point for the library. + It reads all DSDL definitions from the specified `dsdl_files` and produces the annotated AST for these types and + the transitive closure of the types they depend on. + + :param dsdl_files: A list of paths to dsdl files to parse. + + :param root_namespace_directories_or_names: This can be a set of names of root namespaces or relative paths to + root namespaces. All `dsdl_files` provided must be under one of these roots. For example, given: + + ``` + dsdl_files = [ + Path("workspace/project/types/animals/felines/Tabby.1.0"), + Path("workspace/project/types/animals/canines/Boxer.1.0") + Path("workspace/project/types/plants/trees/DouglasFir.1.0") + ] + ``` + + then this argument must be one of: + + ``` + root_namespace_directories_or_names = ["animals", "plants"] + root_namespace_directories_or_names = [ + Path("workspace/project/types/animals"), + Path("workspace/project/types/plants") + ] + ``` + + :param lookup_directories: List of other namespace directories containing data type definitions that are + referred to from the target dsdl files. For example, if you are reading vendor-specific types, + the list of lookup directories should always include a path to the standard root namespace ``uavcan``, + otherwise the types defined in the vendor-specific namespace won't be able to use data types from the + standard namespace. + + :param print_output_handler: If provided, this callable will be invoked when a ``@print`` directive + is encountered or when the frontend needs to emit a diagnostic; + the arguments are: path, line number (1-based), text. + If not provided, no output will be produced except for the standard Python logging subsystem + (but ``@print`` expressions will be evaluated anyway, and a failed evaluation will be a fatal error). + + :param allow_unregulated_fixed_port_id: Do not reject unregulated fixed port identifiers. + As demanded by the specification, the frontend rejects unregulated fixed port ID by default. + This is a dangerous feature that must not be used unless you understand the risks. + Please read https://opencyphal.org/guide. + + :return: A Tuple of lists of :class:`pydsdl.CompositeType`. The first index in the Tuple are the types parsed from + the `dsdl_files` argument. The second index are types that the target `dsdl_files` utilizes. + A note for using these values to describe build dependencies: each :class:`pydsdl.CompositeType` has two + fields that provide links back to the filesystem where the dsdl files read when parsing the type were found; + `source_file_path` and `source_file_path_to_root`. + + :raises: :class:`pydsdl.FrontendError`, :class:`MemoryError`, :class:`SystemError`, + :class:`OSError` if directories do not exist or inaccessible, + :class:`ValueError`/:class:`TypeError` if the arguments are invalid. + """ + # Normalize paths and remove duplicates. Resolve symlinks to avoid ambiguities. + target_dsdl_definitions = _construct_dsdl_definitions_from_files( + normalize_paths_argument_to_list(dsdl_files), + normalize_paths_argument_to_set(root_namespace_directories_or_names), + ) + if len(target_dsdl_definitions) == 0: + _logger.info("No DSDL files found in the specified directories") + return ([], []) + _logger.debug("Target DSDL definitions are listed below:") + + if _logger.isEnabledFor(logging.DEBUG): + for x in target_dsdl_definitions: + _logger.debug(_LOG_LIST_ITEM_PREFIX + str(x.file_path)) + + root_namespaces = {f.root_namespace_path.resolve() for f in target_dsdl_definitions} + lookup_directories_path_list = _construct_lookup_directories_path_list( + root_namespaces, + normalize_paths_argument_to_list(lookup_directories), + True, + ) + + definitions = _complete_read_function( + target_dsdl_definitions, lookup_directories_path_list, print_output_handler, allow_unregulated_fixed_port_id + ) + + return (definitions.direct, definitions.transitive) + + +# +--[INTERNAL API::PUBLIC API HELPERS]-------------------------------------------------------------------------------+ +# These are functions called by the public API before the actual processing begins. + +DSDL_FILE_SUFFIX = ".dsdl" +DSDL_FILE_GLOB = f"*{DSDL_FILE_SUFFIX}" +DSDL_FILE_SUFFIX_LEGACY = ".uavcan" +DSDL_FILE_GLOB_LEGACY = f"*{DSDL_FILE_SUFFIX_LEGACY}" +_LOG_LIST_ITEM_PREFIX = " " * 4 + + +def _complete_read_function( + target_dsdl_definitions: SortedFileList, + lookup_directories_path_list: List[Path], + print_output_handler: Optional[PrintOutputHandler], + allow_unregulated_fixed_port_id: bool, +) -> DsdlDefinitions: + + lookup_dsdl_definitions = _construct_dsdl_definitions_from_namespaces(lookup_directories_path_list) # Check for collisions against the lookup definitions also. _ensure_no_collisions(target_dsdl_definitions, lookup_dsdl_definitions) @@ -177,8 +277,9 @@ def read_namespace( ", ".join(set(sorted(map(lambda t: t.root_namespace, lookup_dsdl_definitions)))), ) - # Read the constructed definitions. - types = _read_namespace_definitions( + # This is the biggie. All the rest of the wranging is just to get to this point. This will take the + # most time and memory. + definitions = read_definitions( target_dsdl_definitions, lookup_dsdl_definitions, print_output_handler, allow_unregulated_fixed_port_id ) @@ -188,57 +289,103 @@ def read_namespace( # directories may contain issues and mistakes that are outside of the control of the user (e.g., # they could be managed by a third party) -- the user shouldn't be affected by mistakes committed # by the third party. - _ensure_no_fixed_port_id_collisions(types) - _ensure_minor_version_compatibility(types) + _ensure_no_fixed_port_id_collisions(definitions.direct) + _ensure_minor_version_compatibility(definitions.transitive + definitions.direct) - return types + return definitions -DSDL_FILE_GLOB = "*.dsdl" -DSDL_FILE_GLOB_LEGACY = "*.uavcan" -_LOG_LIST_ITEM_PREFIX = " " * 4 +def _construct_lookup_directories_path_list( + root_namespace_directories: Iterable[Path], + lookup_directories_path_list: List[Path], + allow_root_namespace_name_collision: bool, +) -> List[Path]: + """ + Intermediate transformation and validation of inputs into a list of lookup directories as paths. -_logger = logging.getLogger(__name__) + :param root_namespace_directory: The path of the root namespace directory that will be read. + For example, ``dsdl/uavcan`` to read the ``uavcan`` namespace. + :param lookup_directories: List of other namespace directories containing data type definitions that are + referred to from the target root namespace. For example, if you are reading a vendor-specific namespace, + the list of lookup directories should always include a path to the standard root namespace ``uavcan``, + otherwise the types defined in the vendor-specific namespace won't be able to use data types from the + standard namespace. -def _read_namespace_definitions( - target_definitions: List[_dsdl_definition.DSDLDefinition], - lookup_definitions: List[_dsdl_definition.DSDLDefinition], - print_output_handler: Optional[PrintOutputHandler] = None, - allow_unregulated_fixed_port_id: bool = False, -) -> List[_serializable.CompositeType]: - """ - Construct type descriptors from the specified target definitions. - Allow the target definitions to use the lookup definitions within themselves. - :param target_definitions: Which definitions to read. - :param lookup_definitions: Which definitions can be used by the processed definitions. - :return: A list of types. + :param allow_root_namespace_name_collision: Allow using the source root namespace name in the look up dirs or + the same root namespace name multiple times in the lookup dirs. This will enable defining a namespace + partially and let other entities define new messages or new sub-namespaces in the same root namespace. + + :return: A list of lookup directories as paths. + + :raises: :class:`pydsdl.FrontendError`, :class:`MemoryError`, :class:`SystemError`, + :class:`OSError` if directories do not exist or inaccessible, + :class:`ValueError`/:class:`TypeError` if the arguments are invalid. """ + # Add the own root namespace to the set of lookup directories, sort lexicographically, remove duplicates. + # We'd like this to be an iterable list of strings but we handle the common practice of passing in a single path. - def make_print_handler(definition: _dsdl_definition.DSDLDefinition) -> Callable[[int, str], None]: - def handler(line_number: int, text: str) -> None: - if print_output_handler: # pragma: no branch - assert isinstance(line_number, int) and isinstance(text, str) - assert line_number > 0, "Line numbers must be one-based" - print_output_handler(definition.file_path, line_number, text) + # Normalize paths and remove duplicates. Resolve symlinks to avoid ambiguities. + lookup_directories_path_list.extend(root_namespace_directories) + lookup_directories_path_list = list(sorted({x.resolve() for x in lookup_directories_path_list})) + _logger.debug("Lookup directories are listed below:") + for a in lookup_directories_path_list: + _logger.debug(_LOG_LIST_ITEM_PREFIX + str(a)) - return handler + # Check for common usage errors and warn the user if anything looks suspicious. + _ensure_no_common_usage_errors(root_namespace_directories, lookup_directories_path_list, _logger.warning) - types = [] # type: List[_serializable.CompositeType] - for tdd in target_definitions: - try: - dt = tdd.read(lookup_definitions, make_print_handler(tdd), allow_unregulated_fixed_port_id) - except _error.FrontendError as ex: # pragma: no cover - ex.set_error_location_if_unknown(path=tdd.file_path) - raise ex - except (MemoryError, SystemError): # pragma: no cover - raise - except Exception as ex: # pragma: no cover - raise _error.InternalError(culprit=ex, path=tdd.file_path) from ex - else: - types.append(dt) + # Check the namespaces. + _ensure_no_nested_root_namespaces(lookup_directories_path_list) + + if not allow_root_namespace_name_collision: + _ensure_no_namespace_name_collisions(lookup_directories_path_list) - return types + return lookup_directories_path_list + + +def _construct_dsdl_definitions_from_files( + dsdl_files: List[Path], + valid_roots: Set[Path], +) -> SortedFileList: + """ """ + output = set() # type: Set[DsdlFileBuildable] + for fp in dsdl_files: + root_namespace_path = _infer_path_to_root(fp, valid_roots) + if fp.suffix == DSDL_FILE_SUFFIX_LEGACY: + _logger.warning( + "File uses deprecated extension %r, please rename to use %r: %s", + DSDL_FILE_SUFFIX_LEGACY, + DSDL_FILE_SUFFIX, + fp, + ) + output.add(_dsdl_definition.DSDLDefinition(fp, root_namespace_path)) + + return dsdl_file_sort(output) + + +def _construct_dsdl_definitions_from_namespaces( + root_namespace_paths: List[Path], +) -> SortedFileList: + """ + Accepts a directory path, returns a sorted list of abstract DSDL file representations. Those can be read later. + The definitions are sorted by name lexicographically, then by major version (greatest version first), + then by minor version (same ordering as the major version). + """ + source_file_paths: Set[Tuple[Path, Path]] = set() # index of all file paths already found + for root_namespace_path in root_namespace_paths: + for p in root_namespace_path.rglob(DSDL_FILE_GLOB): + source_file_paths.add((p, root_namespace_path)) + for p in root_namespace_path.rglob(DSDL_FILE_GLOB_LEGACY): + source_file_paths.add((p, root_namespace_path)) + _logger.warning( + "File uses deprecated extension %r, please rename to use %r: %s", + DSDL_FILE_GLOB_LEGACY, + DSDL_FILE_GLOB, + p, + ) + + return dsdl_file_sort([_dsdl_definition.DSDLDefinition(*p) for p in source_file_paths]) def _ensure_no_collisions( @@ -375,7 +522,7 @@ def _ensure_minor_version_compatibility_pairwise( def _ensure_no_common_usage_errors( - root_namespace_directory: Path, lookup_directories: Iterable[Path], reporter: Callable[[str], None] + root_namespace_directories: Iterable[Path], lookup_directories: Iterable[Path], reporter: Callable[[str], None] ) -> None: suspicious_base_names = [ "public_regulated_data_types", @@ -391,7 +538,7 @@ def is_valid_name(s: str) -> bool: return True # resolve() will also normalize the case in case-insensitive filesystems. - all_paths = {root_namespace_directory.resolve()} | {x.resolve() for x in lookup_directories} + all_paths = {y.resolve() for y in root_namespace_directories} | {x.resolve() for x in lookup_directories} for p in all_paths: try: candidates = [x for x in p.iterdir() if x.is_dir() and is_valid_name(x.name)] @@ -435,32 +582,59 @@ def _ensure_no_namespace_name_collisions(directories: Iterable[Path]) -> None: raise RootNamespaceNameCollisionError("The name of this namespace conflicts with %s" % b, path=a) -def _construct_dsdl_definitions_from_namespace(root_namespace_path: Path) -> List[_dsdl_definition.DSDLDefinition]: +def _infer_path_to_root(dsdl_path: Path, valid_dsdl_roots_or_path_to_root: Optional[Set[Path]] = None) -> Path: """ - Accepts a directory path, returns a sorted list of abstract DSDL file representations. Those can be read later. - The definitions are sorted by name lexicographically, then by major version (greatest version first), - then by minor version (same ordering as the major version). + Infer the path to the namespace root of a DSDL file path. + :param dsdl_path: The path to the alleged DSDL file. + :param valid_dsdl_roots_or_path_to_root: The set of valid root names or paths under which the type must reside. + :return The path to the root namespace directory. + :raises DsdlPathInferenceError: If the namespace root cannot be inferred from the provided information. """ - source_file_paths: Set[Path] = set() - for p in root_namespace_path.rglob(DSDL_FILE_GLOB): - source_file_paths.add(p) - for p in root_namespace_path.rglob(DSDL_FILE_GLOB_LEGACY): - source_file_paths.add(p) - _logger.warning( - "File uses deprecated extension %r, please rename to use %r: %s", DSDL_FILE_GLOB_LEGACY, DSDL_FILE_GLOB, p + if dsdl_path.is_absolute(): + if valid_dsdl_roots_or_path_to_root is None: + raise DsdlPathInferenceError( + f"dsdl_path ({dsdl_path}) is absolute and no valid root names or path to root was provided. The " + "DSDL root of an absolute path cannot be inferred without this information.", + ) + if len(valid_dsdl_roots_or_path_to_root) == 0: + raise DsdlPathInferenceError( + f"dsdl_path ({dsdl_path}) is absolute and the provided valid root names are empty. The DSDL root of " + "an absolute path cannot be inferred without this information.", + ) + for path_to_root in valid_dsdl_roots_or_path_to_root: + try: + _ = dsdl_path.relative_to(path_to_root) + except ValueError: + continue + return path_to_root + raise DsdlPathInferenceError( + f"dsdl_path ({dsdl_path}) is absolute but is not relative to " + f"any provided path to root {valid_dsdl_roots_or_path_to_root}", ) - output = [] # type: List[_dsdl_definition.DSDLDefinition] - for fp in sorted(source_file_paths): - dsdl_def = _dsdl_definition.DSDLDefinition(fp, root_namespace_path) - output.append(dsdl_def) + if valid_dsdl_roots_or_path_to_root is not None and len(valid_dsdl_roots_or_path_to_root) > 0: + parts = list(dsdl_path.parent.parts) + namespace_parts = None + for i, part in list(enumerate(parts)): + if part in valid_dsdl_roots_or_path_to_root: + namespace_parts = parts[i:] + return Path().joinpath(*parts[: i + 1]) + # +1 to include the root folder + if namespace_parts is None: + raise DsdlPathInferenceError(f"No valid root found in path {str(dsdl_path)}") - # Lexicographically by name, newest version first. - return list(sorted(output, key=lambda d: (d.full_name, -d.version.major, -d.version.minor))) + if not dsdl_path.is_absolute(): + return Path(dsdl_path.parts[0]) + + raise DsdlPathInferenceError(f"Could not determine a path to the namespace root of dsdl path {dsdl_path}") + + +# +--[ UNIT TESTS ]---------------------------------------------------------------------------------------------------+ def _unittest_dsdl_definition_constructor() -> None: import tempfile + from ._dsdl_definition import FileNameFormatError with tempfile.TemporaryDirectory() as directory: @@ -472,7 +646,7 @@ def _unittest_dsdl_definition_constructor() -> None: (root / "nested/2.Asd.21.32.dsdl").write_text("# TEST B") (root / "nested/Foo.32.43.dsdl").write_text("# TEST C") - dsdl_defs = _construct_dsdl_definitions_from_namespace(root) + dsdl_defs = _construct_dsdl_definitions_from_namespaces([root]) print(dsdl_defs) lut = {x.full_name: x for x in dsdl_defs} # type: Dict[str, _dsdl_definition.DSDLDefinition] assert len(lut) == 3 @@ -528,7 +702,7 @@ def _unittest_dsdl_definition_constructor() -> None: (root / "nested/Malformed.MAJOR.MINOR.dsdl").touch() try: - _construct_dsdl_definitions_from_namespace(root) + _construct_dsdl_definitions_from_namespaces([root]) except FileNameFormatError as ex: print(ex) (root / "nested/Malformed.MAJOR.MINOR.dsdl").unlink() @@ -537,7 +711,7 @@ def _unittest_dsdl_definition_constructor() -> None: (root / "nested/NOT_A_NUMBER.Malformed.1.0.dsdl").touch() try: - _construct_dsdl_definitions_from_namespace(root) + _construct_dsdl_definitions_from_namespaces([root]) except FileNameFormatError as ex: print(ex) (root / "nested/NOT_A_NUMBER.Malformed.1.0.dsdl").unlink() @@ -546,26 +720,26 @@ def _unittest_dsdl_definition_constructor() -> None: (root / "nested/Malformed.dsdl").touch() try: - _construct_dsdl_definitions_from_namespace(root) + _construct_dsdl_definitions_from_namespaces([root]) except FileNameFormatError as ex: print(ex) (root / "nested/Malformed.dsdl").unlink() else: # pragma: no cover assert False - _construct_dsdl_definitions_from_namespace(root) # making sure all errors are cleared + _construct_dsdl_definitions_from_namespaces([root]) # making sure all errors are cleared (root / "nested/super.bad").mkdir() (root / "nested/super.bad/Unreachable.1.0.dsdl").touch() try: - _construct_dsdl_definitions_from_namespace(root) + _construct_dsdl_definitions_from_namespaces([root]) except FileNameFormatError as ex: print(ex) else: # pragma: no cover assert False try: - _construct_dsdl_definitions_from_namespace(root / "nested/super.bad") + _construct_dsdl_definitions_from_namespaces([root / "nested/super.bad"]) except FileNameFormatError as ex: print(ex) else: # pragma: no cover @@ -582,7 +756,7 @@ def _unittest_dsdl_definition_constructor_legacy() -> None: root = di / "foo" root.mkdir() (root / "123.Qwerty.123.234.uavcan").write_text("# TEST A") - dsdl_defs = _construct_dsdl_definitions_from_namespace(root) + dsdl_defs = _construct_dsdl_definitions_from_namespaces([root]) print(dsdl_defs) lut = {x.full_name: x for x in dsdl_defs} # type: Dict[str, _dsdl_definition.DSDLDefinition] assert len(lut) == 1 @@ -609,33 +783,34 @@ def _unittest_common_usage_errors() -> None: reports = [] # type: List[str] - _ensure_no_common_usage_errors(root_ns_dir, [], reports.append) + _ensure_no_common_usage_errors([root_ns_dir], [], reports.append) assert not reports - _ensure_no_common_usage_errors(root_ns_dir, [di / "baz"], reports.append) + _ensure_no_common_usage_errors([root_ns_dir], [di / "baz"], reports.append) assert not reports dir_dsdl = root_ns_dir / "dsdl" dir_dsdl.mkdir() - _ensure_no_common_usage_errors(dir_dsdl, [di / "baz"], reports.append) + _ensure_no_common_usage_errors([dir_dsdl], [di / "baz"], reports.append) assert not reports # Because empty. dir_dsdl_vscode = dir_dsdl / ".vscode" dir_dsdl_vscode.mkdir() - _ensure_no_common_usage_errors(dir_dsdl, [di / "baz"], reports.append) + _ensure_no_common_usage_errors([dir_dsdl], [di / "baz"], reports.append) assert not reports # Because the name is not valid. dir_dsdl_uavcan = dir_dsdl / "uavcan" dir_dsdl_uavcan.mkdir() - _ensure_no_common_usage_errors(dir_dsdl, [di / "baz"], reports.append) + _ensure_no_common_usage_errors([dir_dsdl], [di / "baz"], reports.append) (rep,) = reports reports.clear() assert str(dir_dsdl_uavcan.resolve()).lower() in rep.lower() def _unittest_nested_roots() -> None: - from pytest import raises import tempfile + from pytest import raises + with tempfile.TemporaryDirectory() as directory: di = Path(directory) (di / "a").mkdir() @@ -663,3 +838,59 @@ def _unittest_issue_71() -> None: # https://github.com/OpenCyphal/pydsdl/issues (real / "Msg.0.1.dsdl").write_text("@sealed") assert len(read_namespace(real, [real, link])) == 1 assert len(read_namespace(link, [real, link])) == 1 + + +def _unittest_type_from_path_inference() -> None: + from pytest import raises as expect_raises + + # To determine the namespace do + + dsdl_file = Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl") + path_to_root = _infer_path_to_root(dsdl_file, {"uavcan"}) + namespace_parts = dsdl_file.parent.relative_to(path_to_root.parent).parts + + assert path_to_root == Path("/repo/uavcan") + assert namespace_parts == ("uavcan", "foo", "bar") + + # The root namespace cannot be inferred in an absolute path without additional data: + + with expect_raises(DsdlPathInferenceError): + _ = _infer_path_to_root(Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl")) + + # If an absolute path is provided along with a path-to-root "hint" then the former must be relative to the + # latter: + + # dsdl file path is not contained within the root path + with expect_raises(DsdlPathInferenceError): + _ = _infer_path_to_root(Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl"), {Path("/not-a-repo")}) + + # This works + root = _infer_path_to_root(Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl"), {Path("/repo")}) + assert root == Path("/repo") + + # Either relative or absolute paths given a set of valid root names will prefer searching for the root: + + valid_roots = {"uavcan", "cyphal"} + + # absolute dsdl path using valid roots + root = _infer_path_to_root(Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl"), valid_roots) + assert root == Path("/repo/uavcan") + + # relative dsdl path using valid roots + root = _infer_path_to_root(Path("repo/uavcan/foo/bar/435.baz.1.0.dsdl"), valid_roots) + assert root == Path("repo/uavcan") + + # absolute dsdl path using valid roots but an invalid file path + with expect_raises(DsdlPathInferenceError): + _ = _infer_path_to_root(Path("/repo/crap/foo/bar/435.baz.1.0.dsdl"), valid_roots) + + # relative dsdl path using valid roots but an invalid file path + with expect_raises(DsdlPathInferenceError): + _ = _infer_path_to_root(Path("repo/crap/foo/bar/435.baz.1.0.dsdl"), valid_roots) + + # The final inference made is when relative dsdl paths are provided with no additional information. In this + # case the method assumes that the relative path is the correct and complete namespace of the type: + + # relative path + root = _infer_path_to_root(Path("uavcan/foo/bar/435.baz.1.0.dsdl")) + assert root == Path("uavcan") diff --git a/pydsdl/_namespace_reader.py b/pydsdl/_namespace_reader.py new file mode 100644 index 0000000..a74164a --- /dev/null +++ b/pydsdl/_namespace_reader.py @@ -0,0 +1,272 @@ +# Copyright (C) OpenCyphal Development Team +# Copyright Amazon.com Inc. or its affiliates. +# SPDX-License-Identifier: MIT + + +import functools +import logging +from pathlib import Path +from typing import NamedTuple, Optional, Set + +from ._dsdl import DsdlFile, DsdlFileBuildable, PrintOutputHandler, SortedFileList +from ._dsdl import file_sort as dsdl_file_sort +from ._error import FrontendError, InternalError + + +def _read_definitions( + target_definitions: SortedFileList, + lookup_definitions: SortedFileList, + print_output_handler: Optional[PrintOutputHandler], + allow_unregulated_fixed_port_id: bool, + direct: Set[DsdlFileBuildable], + transitive: Set[DsdlFileBuildable], + level: int, +) -> None: + """ + Don't look at me! I'm hideous! + (recursive method with a lot of arguments. See read_definitions for documentation) + """ + + _pending_definitions: Set[DsdlFileBuildable] = set() + + def callback(_: DsdlFile, dependent_type: DsdlFileBuildable) -> None: + _pending_definitions.add(dependent_type) + + def print_handler(file: Path, line: int, message: str) -> None: + if print_output_handler is not None: + print_output_handler(file, line, message) + + for target_definition in target_definitions: + _pending_definitions.clear() + + if target_definition in (direct, transitive): + logging.debug("Skipping target file %s because it has already been processed", target_definition.file_path) + continue + + if not isinstance(target_definition, DsdlFileBuildable): + raise TypeError("Expected DsdlFileBuildable, got: " + type(target_definition).__name__) + + try: + + target_definition.read( + lookup_definitions, + [callback], + functools.partial(print_handler, target_definition.file_path), + allow_unregulated_fixed_port_id, + ) + except FrontendError as ex: # pragma: no cover + ex.set_error_location_if_unknown(path=target_definition.file_path) + raise ex + except Exception as ex: # pragma: no cover + raise InternalError(culprit=ex, path=target_definition.file_path) from ex + + if level == 0: + direct.add(target_definition) + try: + transitive.remove(target_definition) + except KeyError: + pass + elif target_definition not in direct: + transitive.add(target_definition) + + _read_definitions( + dsdl_file_sort(_pending_definitions), + lookup_definitions, + print_output_handler, + allow_unregulated_fixed_port_id, + direct, + transitive, + level + 1, + ) + + +# +---[FILE: PUBLIC]--------------------------------------------------------------------------------------------------+ + +DsdlDefinitions = NamedTuple("DsdlDefinitions", [("direct", SortedFileList), ("transitive", SortedFileList)]) +""" +Common DSDL definition set including the direct dependencies requested and the transitive dependencies found. The former +and latter sets will be disjoint. +""" + + +def read_definitions( + target_definitions: SortedFileList, + lookup_definitions: SortedFileList, + print_output_handler: Optional[PrintOutputHandler], + allow_unregulated_fixed_port_id: bool, +) -> DsdlDefinitions: + """ + Given a set of DSDL files, this method reads the text and invokes the parser for each and for any files found in the + lookup set where these are used by the target set. + + :param target_definitions: List of definitions to read. + :param lookup_definitions: List of definitions available for referring to. + :param print_output_handler: Used for @print and for diagnostics: (line_number, text) -> None. + :param allow_unregulated_fixed_port_id: Do not complain about fixed unregulated port IDs. + :return: The data type representation. + :raises InvalidDefinitionError: If a dependency is missing. + :raises InternalError: If an unexpected error occurs. + """ + _direct: Set[DsdlFileBuildable] = set() + _transitive: Set[DsdlFileBuildable] = set() + _read_definitions( + target_definitions, + lookup_definitions, + print_output_handler, + allow_unregulated_fixed_port_id, + _direct, + _transitive, + 0, + ) + return DsdlDefinitions( + dsdl_file_sort([d.get_composite_type() for d in _direct]), + dsdl_file_sort([t.get_composite_type() for t in _transitive]), + ) + + +# +-[UNIT TESTS]------------------------------------------------------------------------------------------------------+ + + +def _unittest_namespace_reader_read_definitions(temp_dsdl_factory) -> None: # type: ignore + from . import _dsdl_definition + + target = temp_dsdl_factory.new_file(Path("root", "ns", "Target.1.1.dsdl"), "@sealed") + target_definitions = [_dsdl_definition.DSDLDefinition(target, target.parent)] + lookup_definitions: list = [] + + read_definitions(target_definitions, lookup_definitions, None, True) + + +def _unittest_namespace_reader_read_definitions_multiple(temp_dsdl_factory) -> None: # type: ignore + from . import _dsdl_definition + + targets = [ + temp_dsdl_factory.new_file(Path("root", "ns", "Target.1.1.dsdl"), "@sealed\nns.Aisle.1.0 paper_goods\n"), + temp_dsdl_factory.new_file(Path("root", "ns", "Target.2.0.dsdl"), "@sealed\nns.Aisle.2.0 paper_goods\n"), + temp_dsdl_factory.new_file(Path("root", "ns", "Walmart.2.4.dsdl"), "@sealed\nns.Aisle.1.0 paper_goods\n"), + ] + aisles = [ + temp_dsdl_factory.new_file(Path("root", "ns", "Aisle.1.0.dsdl"), "@sealed"), + temp_dsdl_factory.new_file(Path("root", "ns", "Aisle.2.0.dsdl"), "@sealed"), + temp_dsdl_factory.new_file(Path("root", "ns", "Aisle.3.0.dsdl"), "@sealed"), + ] + + definitions = read_definitions( + [_dsdl_definition.DSDLDefinition(t, t.parent) for t in targets], + [_dsdl_definition.DSDLDefinition(a, a.parent) for a in aisles], + None, + True, + ) + + assert len(definitions.direct) == 3 + assert len(definitions.transitive) == 2 + + +def _unittest_namespace_reader_read_definitions_multiple_no_load(temp_dsdl_factory) -> None: # type: ignore + """ + Ensure that the loader does not load files that are not in the transitive closure of the target files. + """ + from . import _dsdl_definition + from pytest import raises as assert_raises + from ._error import InvalidDefinitionError + + targets = [ + temp_dsdl_factory.new_file(Path("root", "ns", "Adams.1.0.dsdl"), "@sealed\nns.Tacoma.1.0 volcano\n"), + temp_dsdl_factory.new_file(Path("root", "ns", "Hood.1.0.dsdl"), "@sealed\nns.Rainer.1.0 volcano\n"), + temp_dsdl_factory.new_file(Path("root", "ns", "StHelens.2.1.dsdl"), "@sealed\nns.Baker.1.0 volcano\n"), + ] + dependencies = [ + temp_dsdl_factory.new_file(Path("root", "ns", "Tacoma.1.0.dsdl"), "@sealed"), + temp_dsdl_factory.new_file(Path("root", "ns", "Rainer.1.0.dsdl"), "@sealed"), + temp_dsdl_factory.new_file(Path("root", "ns", "Baker.1.0.dsdl"), "@sealed"), + Path( + "root", "ns", "Shasta.1.0.dsdl" + ), # since this isn't in the transitive closure of target dependencies it will + # never be read thus it will not be an error that it does not exist. + ] + + target_definitions = [_dsdl_definition.DSDLDefinition(t, t.parent) for t in targets] + lookup_definitions = [_dsdl_definition.DSDLDefinition(a, a.parent) for a in dependencies] + _ = read_definitions( + target_definitions, + lookup_definitions, + None, + True, + ) + + # make sure Shasta.1.0 was never accessed but Tacoma 1.0 was + with assert_raises(InvalidDefinitionError): + _ = lookup_definitions[-1].get_composite_type() + assert lookup_definitions[0].composite_type is not None + + +def _unittest_namespace_reader_read_definitions_missing_dependency(temp_dsdl_factory) -> None: # type: ignore + """ + Verify that an error is raised when a dependency is missing. + """ + from pytest import raises as assert_raises + + from . import _dsdl_definition + from ._data_type_builder import UndefinedDataTypeError + + with assert_raises(UndefinedDataTypeError): + read_definitions( + [ + _dsdl_definition.DSDLDefinition( + f := temp_dsdl_factory.new_file( + Path("root", "ns", "Cat.1.0.dsdl"), "@sealed\nns.Birman.1.0 fluffy\n" + ), + f.parent, + ) + ], + [], + None, + True, + ) + + +def _unittest_namespace_reader_read_definitions_target_in_lookup(temp_dsdl_factory) -> None: # type: ignore + """ + Ensure the direct and transitive sets are disjoint. + """ + from . import _dsdl_definition + + targets = [ + temp_dsdl_factory.new_file(Path("root", "ns", "Ontario.1.0.dsdl"), "@sealed\nns.NewBrunswick.1.0 place\n"), + temp_dsdl_factory.new_file(Path("root", "ns", "NewBrunswick.1.0.dsdl"), "@sealed"), + ] + lookup = [ + temp_dsdl_factory.new_file(Path("root", "ns", "NewBrunswick.1.0.dsdl"), "@sealed"), + ] + + definitions = read_definitions( + [_dsdl_definition.DSDLDefinition(t, t.parent) for t in targets], + [_dsdl_definition.DSDLDefinition(l, l.parent) for l in lookup], + None, + True, + ) + + assert len(definitions.direct) == 2 + assert len(definitions.transitive) == 0 + + +def _unittest_namespace_reader_read_defs_target_dont_allow_unregulated(temp_dsdl_factory) -> None: # type: ignore + """ + Ensure that an error is raised when an invalid, fixed port ID is used without an override. + """ + from pytest import raises as assert_raises + + from . import _dsdl_definition + from ._data_type_builder import UnregulatedFixedPortIDError + + targets = [ + temp_dsdl_factory.new_file(Path("root", "ns", "845.Lice.1.0.dsdl"), "@sealed\n"), + ] + + with assert_raises(UnregulatedFixedPortIDError): + read_definitions( + [_dsdl_definition.DSDLDefinition(t, t.parent) for t in targets], + [], + None, + False, + ) diff --git a/pydsdl/_parser.py b/pydsdl/_parser.py index b73a533..e1de281 100644 --- a/pydsdl/_parser.py +++ b/pydsdl/_parser.py @@ -9,8 +9,8 @@ import fractions from pathlib import Path from typing import List, Tuple -import parsimonious -from parsimonious.nodes import Node as _Node +import parsimonious # type: ignore +from parsimonious.nodes import Node as _Node # type: ignore from . import _error from . import _serializable from . import _expression @@ -27,14 +27,14 @@ def parse(text: str, statement_stream_processor: "StatementStreamProcessor") -> """ pr = _ParseTreeProcessor(statement_stream_processor) try: - pr.visit(_get_grammar().parse(text)) # type: ignore + pr.visit(_get_grammar().parse(text)) except _error.FrontendError as ex: # Inject error location. If this exception is being propagated from a recursive instance, it already has # its error location populated, so nothing will happen here. ex.set_error_location_if_unknown(line=pr.current_line_number) raise ex except parsimonious.ParseError as ex: - raise DSDLSyntaxError("Syntax error", line=int(ex.line())) from None # type: ignore + raise DSDLSyntaxError("Syntax error", line=int(ex.line())) from None except parsimonious.VisitationError as ex: # pragma: no cover # noinspection PyBroadException try: @@ -89,7 +89,7 @@ def resolve_versioned_data_type(self, name: str, version: _serializable.Version) @functools.lru_cache(None) def _get_grammar() -> parsimonious.Grammar: - return parsimonious.Grammar((Path(__file__).parent / "grammar.parsimonious").read_text()) # type: ignore + return parsimonious.Grammar((Path(__file__).parent / "grammar.parsimonious").read_text()) _logger = logging.getLogger(__name__) @@ -131,7 +131,7 @@ class _ParseTreeProcessor(parsimonious.NodeVisitor): # Intentional exceptions that shall not be treated as parse errors. # Beware that those might be propagated from recursive parser instances! - unwrapped_exceptions = (_error.FrontendError, SystemError, MemoryError, SystemExit) # type: ignore + unwrapped_exceptions = (_error.FrontendError, SystemError, MemoryError, SystemExit) def __init__(self, statement_stream_processor: StatementStreamProcessor): assert isinstance(statement_stream_processor, StatementStreamProcessor) diff --git a/pydsdl/_serializable/_composite.py b/pydsdl/_serializable/_composite.py index 6ed71da..4e21f3c 100644 --- a/pydsdl/_serializable/_composite.py +++ b/pydsdl/_serializable/_composite.py @@ -97,9 +97,31 @@ def __init__( # pylint: disable=too-many-arguments "Name is too long: %r is longer than %d characters" % (self._name, self.MAX_NAME_LENGTH) ) - for component in self._name.split(self.NAME_COMPONENT_SEPARATOR): + self._name_components = self._name.split(self.NAME_COMPONENT_SEPARATOR) + for component in self._name_components: check_name(component) + def search_up_for_root(path: Path, namespace_components: typing.List[str]) -> Path: + if len(namespace_components) == 0: + raise InvalidNameError( + "Path to file without a namepace. All dsdl files must be contained within " + f"folders corresponding to their namespaces ({self._source_file_path})" + ) + if namespace_components[-1] != path.stem: + raise InvalidNameError( + f"{path.stem} != {namespace_components[-1]}. Source file directory structure " + f"is not consistent with the type's namespace ({self._name_components}, " + f"{self._source_file_path})" + ) + if len(namespace_components) == 1: + return path + return search_up_for_root(path.parent, namespace_components[:-1]) + + self._path_to_root_namespace = search_up_for_root( + self._source_file_path.parent, + (self.namespace_components if not self._has_parent_service else self.namespace_components[:-1]), + ) + # Version check version_valid = ( (0 <= self._version.major <= self.MAX_VERSION_NUMBER) @@ -148,7 +170,12 @@ def full_name(self) -> str: @property def name_components(self) -> typing.List[str]: """Components of the full name as a list, e.g., ``['uavcan', 'node', 'Heartbeat']``.""" - return self._name.split(CompositeType.NAME_COMPONENT_SEPARATOR) + return self._name_components + + @property + def namespace_components(self) -> typing.List[str]: + """Components of the namspace as a list, e.g., ``['uavcan', 'node']``.""" + return self._name_components[:-1] @property def short_name(self) -> str: @@ -163,7 +190,7 @@ def doc(self) -> str: @property def full_namespace(self) -> str: """The full name without the short name, e.g., ``uavcan.node`` for ``uavcan.node.Heartbeat``.""" - return str(CompositeType.NAME_COMPONENT_SEPARATOR.join(self.name_components[:-1])) + return str(CompositeType.NAME_COMPONENT_SEPARATOR.join(self.namespace_components)) @property def root_namespace(self) -> str: @@ -239,10 +266,30 @@ def has_fixed_port_id(self) -> bool: @property def source_file_path(self) -> Path: """ - For synthesized types such as service request/response sections, this property is defined as an empty string. + The path to the dsdl file from which this type was read. + For synthesized types such as service request/response sections, this property is the path to the service type + since request and response types are defined within the service type's dsdl file. """ return self._source_file_path + @property + def source_file_path_to_root(self) -> Path: + """ + The path to the folder that is the root namespace folder for the `source_file_path` this type was read from. + The `source_file_path` will always be relative to the `source_file_path_to_root` but not all types that share + the same `root_namespace` will have the same path to their root folder since types may be contributed to a + root namespace from several different file trees. For example: + + ``` + path0 = "workspace_0/project_a/types/animal/feline/Tabby.1.0.dsdl" + path1 = "workspace_1/project_b/types/animal/canine/Boxer.1.0.dsdl" + ``` + + In these examples path0 and path1 will produce composite types with `animal` as the root namespace but both + with have different `source_file_path_to_root` paths. + """ + return self._path_to_root_namespace + @property def alignment_requirement(self) -> int: # This is more general than required by the Specification, but it is done this way in case if we decided @@ -681,6 +728,9 @@ def iterate_fields_with_offsets( raise TypeError("Service types do not have serializable fields. Use either request or response.") +# +--[UNIT TESTS]-----------------------------------------------------------------------------------------------------+ + + def _unittest_composite_types() -> None: # pylint: disable=too-many-statements from pytest import raises from ._primitive import SignedIntegerType, FloatType @@ -693,7 +743,7 @@ def try_name(name: str) -> CompositeType: attributes=[], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path(*name.split(".")), has_parent_service=False, ) @@ -733,7 +783,7 @@ def try_name(name: str) -> CompositeType: attributes=[], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("a", "A"), has_parent_service=False, ) @@ -748,7 +798,7 @@ def try_name(name: str) -> CompositeType: ], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("a", "A"), has_parent_service=False, ) @@ -762,7 +812,7 @@ def try_name(name: str) -> CompositeType: ], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("uavcan", "node", "Heartbeat"), has_parent_service=False, ) assert u["a"].name == "a" @@ -788,7 +838,7 @@ def try_name(name: str) -> CompositeType: ], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("a", "A"), has_parent_service=False, ) assert s["a"].name == "a" @@ -847,7 +897,7 @@ def try_union_fields(field_types: typing.List[SerializableType]) -> UnionType: attributes=atr, deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("a") / "A", has_parent_service=False, ) @@ -912,7 +962,7 @@ def try_struct_fields(field_types: typing.List[SerializableType]) -> StructureTy attributes=atr, deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("a") / "A", has_parent_service=False, ) @@ -960,7 +1010,7 @@ def make_type(meta: typing.Type[CompositeType], attributes: typing.Iterable[Attr attributes=attributes, deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("fake_root") / "ns" / f"Type{str(_seq_no)}", has_parent_service=False, ) @@ -1228,7 +1278,7 @@ def validate_iterator( attributes=[], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("ns", "S_1_0.dsdl"), has_parent_service=True, ), response=StructureType( @@ -1237,7 +1287,7 @@ def validate_iterator( attributes=[], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("ns", "S_1_0.dsdl"), has_parent_service=True, ), fixed_port_id=None, @@ -1251,7 +1301,7 @@ def validate_iterator( attributes=[], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("ns", "XX_1_0.dsdl"), has_parent_service=True, ), response=StructureType( @@ -1260,8 +1310,31 @@ def validate_iterator( attributes=[], deprecated=True, fixed_port_id=None, - source_file_path=Path(), - has_parent_service=False, + source_file_path=Path("ns", "XX_1_0.dsdl"), + has_parent_service=True, + ), + fixed_port_id=None, + ) + + with raises(ValueError): # Request/response consistency error (internal failure) + ServiceType( + request=StructureType( + name="ns.XX.Request", + version=Version(1, 0), + attributes=[], + deprecated=False, + fixed_port_id=None, + source_file_path=Path("ns", "XX_1_0.dsdl"), + has_parent_service=True, + ), + response=StructureType( + name="ns.XX.Response", + version=Version(1, 0), + attributes=[], + deprecated=False, + fixed_port_id=None, + source_file_path=Path("ns", "YY_1_0.dsdl"), + has_parent_service=True, ), fixed_port_id=None, ) @@ -1273,7 +1346,7 @@ def validate_iterator( attributes=[], deprecated=False, fixed_port_id=None, - source_file_path=Path(), + source_file_path=Path("e", "E_0_1.dsdl"), has_parent_service=False, ) validate_iterator(e, []) diff --git a/pydsdl/_test.py b/pydsdl/_test.py index 3e4048f..2a879c3 100644 --- a/pydsdl/_test.py +++ b/pydsdl/_test.py @@ -62,12 +62,13 @@ def parse_definition( ) -> _serializable.CompositeType: return definition.read( lookup_definitions, + [], print_output_handler=lambda line, text: print("Output from line %d:" % line, text), allow_unregulated_fixed_port_id=False, ) -@pytest.fixture() # type: ignore +@pytest.fixture() def wrkspc() -> Workspace: return Workspace() @@ -422,7 +423,7 @@ def _unittest_error(wrkspc: Workspace) -> None: def standalone(rel_path: str, definition: str, allow_unregulated: bool = False) -> _serializable.CompositeType: return wrkspc.parse_new(rel_path, definition + "\n").read( - [], lambda *_: None, allow_unregulated + [], [], lambda *_: None, allow_unregulated ) # pragma: no branch with raises(_error.InvalidDefinitionError, match="(?i).*port ID.*"): @@ -754,20 +755,20 @@ def print_handler(line_number: int, text: str) -> None: wrkspc.parse_new( "ns/A.1.0.dsdl", "# line number 1\n" "# line number 2\n" "@print 2 + 2 == 4 # line number 3\n" "# line number 4\n" "@sealed\n", - ).read([], print_handler, False) + ).read([], [], print_handler, False) assert printed_items assert printed_items[0] == 3 assert printed_items[1] == "true" - wrkspc.parse_new("ns/B.1.0.dsdl", "@print false\n@sealed").read([], print_handler, False) + wrkspc.parse_new("ns/B.1.0.dsdl", "@print false\n@sealed").read([], [], print_handler, False) assert printed_items assert printed_items[0] == 1 assert printed_items[1] == "false" wrkspc.parse_new( "ns/Offset.1.0.dsdl", "@print _offset_ # Not recorded\n" "uint8 a\n" "@print _offset_\n" "@extent 800\n" - ).read([], print_handler, False) + ).read([], [], print_handler, False) assert printed_items assert printed_items[0] == 3 assert printed_items[1] == "{8}" diff --git a/setup.cfg b/setup.cfg index 8d934ba..c58e783 100644 --- a/setup.cfg +++ b/setup.cfg @@ -35,7 +35,7 @@ include = pydsdl* # -------------------------------------------------- PYTEST -------------------------------------------------- [tool:pytest] -testpaths = pydsdl +testpaths = pydsdl test norecursedirs = third_party python_files = *.py python_classes = _UnitTest diff --git a/test/public_regulated_data_types b/test/public_regulated_data_types new file mode 160000 index 0000000..f9f6790 --- /dev/null +++ b/test/public_regulated_data_types @@ -0,0 +1 @@ +Subproject commit f9f67906cc0ca5d7c1b429924852f6b28f313cbf diff --git a/test/test_public_types.py b/test/test_public_types.py new file mode 100644 index 0000000..8463f97 --- /dev/null +++ b/test/test_public_types.py @@ -0,0 +1,47 @@ +# Copyright (C) OpenCyphal Development Team +# Copyright Amazon.com Inc. or its affiliates. +# SPDX-License-Identifier: MIT + +# pylint: disable=redefined-outer-name +# pylint: disable=logging-fstring-interpolation +import cProfile +import io +import pstats +from pathlib import Path +from pstats import SortKey + +import pydsdl + + +def _unittest_public_types_namespaces(public_types: Path) -> None: + """ + Sanity check to ensure that the public types can be read. This also allows us to debug + against a real dataset. + """ + pr = cProfile.Profile() + pr.enable() + _ = pydsdl.read_namespace(public_types) + pr.disable() + s = io.StringIO() + sortby = SortKey.TIME + ps = pstats.Stats(pr, stream=s).sort_stats(sortby) + ps.print_stats() + print(s.getvalue()) + + +def _unittest_public_types_files(public_types: Path) -> None: + """ + Sanity check to ensure that the public types can be read. This also allows us to debug + against a real dataset. + """ + pr = cProfile.Profile() + pr.enable() + node_types = list(public_types.glob("node/**/*.dsdl")) + assert len(node_types) > 0 + _ = pydsdl.read_files(node_types, {public_types}) + pr.disable() + s = io.StringIO() + sortby = SortKey.TIME + ps = pstats.Stats(pr, stream=s).sort_stats(sortby) + ps.print_stats() + print(s.getvalue())