diff --git a/pydsdl/_data_type_builder.py b/pydsdl/_data_type_builder.py index 8a2d630..96128d0 100644 --- a/pydsdl/_data_type_builder.py +++ b/pydsdl/_data_type_builder.py @@ -2,12 +2,13 @@ # This software is distributed under the terms of the MIT License. # Author: Pavel Kirienko +from __future__ import annotations import logging from pathlib import Path -from typing import Callable, Iterable, Optional +from typing import Callable, Iterable from . import _data_schema_builder, _error, _expression, _parser, _port_id_ranges, _serializable -from ._dsdl import DefinitionVisitor, DsdlFileBuildable +from ._dsdl import DefinitionVisitor, ReadableDSDLFile class AssertionCheckFailureError(_error.InvalidDefinitionError): @@ -42,8 +43,8 @@ class DataTypeBuilder(_parser.StatementStreamProcessor): # pylint: disable=too-many-arguments def __init__( self, - definition: DsdlFileBuildable, - lookup_definitions: Iterable[DsdlFileBuildable], + definition: ReadableDSDLFile, + lookup_definitions: Iterable[ReadableDSDLFile], definition_visitors: Iterable[DefinitionVisitor], print_output_handler: Callable[[int, str], None], allow_unregulated_fixed_port_id: bool, @@ -53,10 +54,10 @@ def __init__( self._definition_visitors = definition_visitors self._print_output_handler = print_output_handler self._allow_unregulated_fixed_port_id = allow_unregulated_fixed_port_id - self._element_callback = None # type: Optional[Callable[[str], None]] + self._element_callback = None # type: Callable[[str], None] | None - assert isinstance(self._definition, DsdlFileBuildable) - assert all(map(lambda x: isinstance(x, DsdlFileBuildable), lookup_definitions)) + assert isinstance(self._definition, ReadableDSDLFile) + assert all(map(lambda x: isinstance(x, ReadableDSDLFile), lookup_definitions)) assert callable(self._print_output_handler) assert isinstance(self._allow_unregulated_fixed_port_id, bool) @@ -148,7 +149,7 @@ def on_padding_field(self, padding_field_type: _serializable.VoidType) -> None: ) def on_directive( - self, line_number: int, directive_name: str, associated_expression_value: Optional[_expression.Any] + self, line_number: int, directive_name: str, associated_expression_value: _expression.Any | None ) -> None: try: handler = { @@ -224,7 +225,7 @@ def resolve_versioned_data_type(self, name: str, version: _serializable.Version) for visitor in self._definition_visitors: visitor.on_definition(self._definition, target_definition) - assert isinstance(target_definition, DsdlFileBuildable) + assert isinstance(target_definition, ReadableDSDLFile) assert target_definition.full_name == full_name assert target_definition.version == version # Recursion is cool. @@ -252,7 +253,7 @@ def _on_attribute(self) -> None: "This is to prevent errors if the extent is dependent on the bit length set of the data schema." ) - def _on_print_directive(self, line_number: int, value: Optional[_expression.Any]) -> None: + def _on_print_directive(self, line_number: int, value: _expression.Any | None) -> None: _logger.info( "Print directive at %s:%d%s", self._definition.file_path, @@ -261,7 +262,7 @@ def _on_print_directive(self, line_number: int, value: Optional[_expression.Any] ) self._print_output_handler(line_number, str(value if value is not None else "")) - def _on_assert_directive(self, line_number: int, value: Optional[_expression.Any]) -> None: + def _on_assert_directive(self, line_number: int, value: _expression.Any | None) -> None: if isinstance(value, _expression.Boolean): if not value.native_value: raise AssertionCheckFailureError( @@ -273,7 +274,7 @@ def _on_assert_directive(self, line_number: int, value: Optional[_expression.Any else: raise InvalidDirectiveError("The assertion check expression must yield a boolean, not %s" % value.TYPE_NAME) - def _on_extent_directive(self, line_number: int, value: Optional[_expression.Any]) -> None: + def _on_extent_directive(self, line_number: int, value: _expression.Any | None) -> None: if self._structs[-1].serialization_mode is not None: raise InvalidDirectiveError( "Misplaced extent directive. The serialization mode is already set to %s" @@ -289,7 +290,7 @@ def _on_extent_directive(self, line_number: int, value: Optional[_expression.Any else: raise InvalidDirectiveError("The extent directive expects a rational, not %s" % value.TYPE_NAME) - def _on_sealed_directive(self, _ln: int, value: Optional[_expression.Any]) -> None: + def _on_sealed_directive(self, _ln: int, value: _expression.Any | None) -> None: if self._structs[-1].serialization_mode is not None: raise InvalidDirectiveError( "Misplaced sealing directive. The serialization mode is already set to %s" @@ -299,7 +300,7 @@ def _on_sealed_directive(self, _ln: int, value: Optional[_expression.Any]) -> No raise InvalidDirectiveError("The sealed directive does not expect an expression") self._structs[-1].set_serialization_mode(_data_schema_builder.SealedSerializationMode()) - def _on_union_directive(self, _ln: int, value: Optional[_expression.Any]) -> None: + def _on_union_directive(self, _ln: int, value: _expression.Any | None) -> None: if value is not None: raise InvalidDirectiveError("The union directive does not expect an expression") if self._structs[-1].union: @@ -308,7 +309,7 @@ def _on_union_directive(self, _ln: int, value: Optional[_expression.Any]) -> Non raise InvalidDirectiveError("The union directive must be placed before the first " "attribute definition") self._structs[-1].make_union() - def _on_deprecated_directive(self, _ln: int, value: Optional[_expression.Any]) -> None: + def _on_deprecated_directive(self, _ln: int, value: _expression.Any | None) -> None: if value is not None: raise InvalidDirectiveError("The deprecated directive does not expect an expression") if self._is_deprecated: @@ -327,7 +328,7 @@ def _make_composite( # pylint: disable=too-many-arguments name: str, version: _serializable.Version, deprecated: bool, - fixed_port_id: Optional[int], + fixed_port_id: int | None, source_file_path: Path, has_parent_service: bool, ) -> _serializable.CompositeType: diff --git a/pydsdl/_dsdl.py b/pydsdl/_dsdl.py index ed6e958..9cc007a 100644 --- a/pydsdl/_dsdl.py +++ b/pydsdl/_dsdl.py @@ -2,9 +2,10 @@ # Copyright Amazon.com Inc. or its affiliates. # SPDX-License-Identifier: MIT +from __future__ import annotations from abc import ABC, abstractmethod from pathlib import Path -from typing import Any, Callable, Iterable, List, Optional, Tuple, TypeVar, Union +from typing import Any, Callable, Iterable, TypeVar, List, Tuple from ._serializable import CompositeType, Version @@ -12,7 +13,7 @@ """Invoked when the frontend encounters a print directive or needs to output a generic diagnostic.""" -class DsdlFile(ABC): +class DSDLFile(ABC): """ Interface for DSDL files. This interface is used by the parser to abstract DSDL type details inferred from the filesystem. Where properties are duplicated between the composite type and this file the composite type is to be @@ -22,7 +23,7 @@ class DsdlFile(ABC): @property @abstractmethod - def composite_type(self) -> Optional[CompositeType]: + def composite_type(self) -> CompositeType | None: """The composite type that was read from the DSDL file or None if the type has not been parsed yet.""" raise NotImplementedError() @@ -71,7 +72,7 @@ def version(self) -> Version: @property @abstractmethod - def fixed_port_id(self) -> Optional[int]: + def fixed_port_id(self) -> int | None: """Either the fixed port ID as integer, or None if not defined for this type.""" raise NotImplementedError() @@ -98,7 +99,7 @@ def root_namespace_path(self) -> Path: raise NotImplementedError() -class DsdlFileBuildable(DsdlFile): +class ReadableDSDLFile(DSDLFile): """ A DSDL file that can construct a composite type from its contents. """ @@ -106,7 +107,7 @@ class DsdlFileBuildable(DsdlFile): @abstractmethod def read( self, - lookup_definitions: Iterable["DsdlFileBuildable"], + lookup_definitions: Iterable["ReadableDSDLFile"], definition_visitors: Iterable["DefinitionVisitor"], print_output_handler: Callable[[int, str], None], allow_unregulated_fixed_port_id: bool, @@ -135,7 +136,7 @@ class DefinitionVisitor(ABC): """ @abstractmethod - def on_definition(self, target_dsdl_file: DsdlFile, dependency_dsdl_file: DsdlFileBuildable) -> None: + def on_definition(self, target_dsdl_file: DSDLFile, dependency_dsdl_file: ReadableDSDLFile) -> None: """ Called by the parser after if finds a dependent type but before it parses a file in a lookup namespace. :param target_dsdl_file: The target DSDL file that has dependencies the parser is searching for. @@ -144,12 +145,12 @@ def on_definition(self, target_dsdl_file: DsdlFile, dependency_dsdl_file: DsdlFi raise NotImplementedError() -SortedFileT = TypeVar("SortedFileT", DsdlFile, DsdlFileBuildable, CompositeType) +SortedFileT = TypeVar("SortedFileT", DSDLFile, ReadableDSDLFile, CompositeType) SortedFileList = List[SortedFileT] """A list of DSDL files sorted by name, newest version first.""" -def get_definition_ordering_rank(d: Union[DsdlFile, CompositeType]) -> Tuple[str, int, int]: +def get_definition_ordering_rank(d: DSDLFile | CompositeType) -> Tuple[str, int, int]: return d.full_name, -d.version.major, -d.version.minor @@ -160,9 +161,7 @@ def file_sort(file_list: Iterable[SortedFileT]) -> SortedFileList[SortedFileT]: return list(sorted(file_list, key=get_definition_ordering_rank)) -def normalize_paths_argument_to_list( - namespaces_or_namespace: Union[None, Path, str, Iterable[Union[Path, str]]], -) -> List[Path]: +def normalize_paths_argument_to_list(namespaces_or_namespace: None | Path | str | Iterable[Path | str]) -> List[Path]: """ Normalizes the input argument to a list of paths. """ diff --git a/pydsdl/_dsdl_definition.py b/pydsdl/_dsdl_definition.py index bbdf075..d1f15b7 100644 --- a/pydsdl/_dsdl_definition.py +++ b/pydsdl/_dsdl_definition.py @@ -2,14 +2,15 @@ # This software is distributed under the terms of the MIT License. # Author: Pavel Kirienko +from __future__ import annotations import logging import time from pathlib import Path -from typing import Callable, Iterable, List, Optional, Type +from typing import Callable, Iterable, Type from . import _parser from ._data_type_builder import DataTypeBuilder, UndefinedDataTypeError -from ._dsdl import DefinitionVisitor, DsdlFileBuildable +from ._dsdl import DefinitionVisitor, ReadableDSDLFile from ._error import FrontendError, InternalError, InvalidDefinitionError from ._serializable import CompositeType, Version @@ -25,17 +26,17 @@ def __init__(self, text: str, path: Path): super().__init__(text=text, path=Path(path)) -class DsdlPathInferenceError(UndefinedDataTypeError): +class PathInferenceError(UndefinedDataTypeError): """ Raised when the namespace, type, fixed port ID, or version cannot be inferred from a file path. """ - def __init__(self, text: str, dsdl_path: Path, valid_dsdl_roots: List[Path]): + def __init__(self, text: str, dsdl_path: Path, valid_dsdl_roots: list[Path]): super().__init__(text=text, path=Path(dsdl_path)) self.valid_dsdl_roots = valid_dsdl_roots[:] if valid_dsdl_roots is not None else None -class DSDLDefinition(DsdlFileBuildable): +class DSDLDefinition(ReadableDSDLFile): """ A DSDL type definition source abstracts the filesystem level details away, presenting a higher-level interface that operates solely on the level of type names, namespaces, fixed identifiers, and so on. @@ -47,7 +48,7 @@ class DSDLDefinition(DsdlFileBuildable): """ @classmethod - def _infer_path_to_root_from_first_found(cls, dsdl_path: Path, valid_dsdl_roots: List[Path]) -> Path: + def _infer_path_to_root_from_first_found(cls, dsdl_path: Path, valid_dsdl_roots: list[Path]) -> Path: """ See `from_first_in` for documentation on this logic. :return The path to the root namespace directory. @@ -56,7 +57,7 @@ def _infer_path_to_root_from_first_found(cls, dsdl_path: Path, valid_dsdl_roots: raise ValueError("valid_dsdl_roots was None") if dsdl_path.is_absolute() and len(valid_dsdl_roots) == 0: - raise DsdlPathInferenceError( + raise PathInferenceError( f"dsdl_path ({dsdl_path}) is absolute and the provided valid root names are empty. The DSDL root of " "an absolute path cannot be inferred without this information.", dsdl_path, @@ -95,10 +96,10 @@ def _infer_path_to_root_from_first_found(cls, dsdl_path: Path, valid_dsdl_roots: if part in root_parts: return Path().joinpath(*parts[: i + 1]) # +1 to include the root folder - raise DsdlPathInferenceError(f"No valid root found in path {str(dsdl_path)}", dsdl_path, valid_dsdl_roots) + raise PathInferenceError(f"No valid root found in path {str(dsdl_path)}", dsdl_path, valid_dsdl_roots) @classmethod - def from_first_in(cls: Type["DSDLDefinition"], dsdl_path: Path, valid_dsdl_roots: List[Path]) -> "DSDLDefinition": + def from_first_in(cls: Type["DSDLDefinition"], dsdl_path: Path, valid_dsdl_roots: list[Path]) -> "DSDLDefinition": """ Creates a DSDLDefinition object by inferring the path to the namespace root of a DSDL file given a set of valid roots. The logic used prefers an instance of dsdl_path found to exist under a valid root but @@ -110,7 +111,7 @@ def from_first_in(cls: Type["DSDLDefinition"], dsdl_path: Path, valid_dsdl_roots This argument is accepted as a list for ordering but no de-duplication is performed as the caller is expected to provide a correct set of paths. :return A new DSDLDefinition object - :raises DsdlPathInferenceError: If the namespace root cannot be inferred from the provided information. + :raises PathInferenceError: If the namespace root cannot be inferred from the provided information. """ return cls(dsdl_path, cls._infer_path_to_root_from_first_found(dsdl_path, valid_dsdl_roots)) @@ -121,7 +122,7 @@ def __init__(self, file_path: Path, root_namespace_path: Path): del file_path self._root_namespace_path = Path(root_namespace_path) del root_namespace_path - self._text: Optional[str] = None + self._text: str | None = None # Checking the sanity of the root directory path - can't contain separators if CompositeType.NAME_COMPONENT_SEPARATOR in self._root_namespace_path.name: @@ -131,7 +132,7 @@ def __init__(self, file_path: Path, root_namespace_path: Path): # Parsing the basename, e.g., 434.GetTransportStatistics.0.1.dsdl basename_components = relative_path.name.split(".")[:-1] - str_fixed_port_id: Optional[str] = None + str_fixed_port_id: str | None = None if len(basename_components) == 4: str_fixed_port_id, short_name, str_major_version, str_minor_version = basename_components elif len(basename_components) == 3: @@ -142,7 +143,7 @@ def __init__(self, file_path: Path, root_namespace_path: Path): # Parsing the fixed port ID, if specified; None if not if str_fixed_port_id is not None: try: - self._fixed_port_id: Optional[int] = int(str_fixed_port_id) + self._fixed_port_id: int | None = int(str_fixed_port_id) except ValueError: raise FileNameFormatError( "Not a valid fixed port-ID: %s. " @@ -167,14 +168,14 @@ def __init__(self, file_path: Path, root_namespace_path: Path): raise FileNameFormatError(f"Invalid name for namespace component: {nc!r}", path=self._file_path) self._name: str = CompositeType.NAME_COMPONENT_SEPARATOR.join(namespace_components + [str(short_name)]) - self._cached_type: Optional[CompositeType] = None + self._cached_type: CompositeType | None = None # +-----------------------------------------------------------------------+ - # | DsdlFileBuildable :: INTERFACE | + # | ReadableDSDLFile :: INTERFACE | # +-----------------------------------------------------------------------+ def read( self, - lookup_definitions: Iterable[DsdlFileBuildable], + lookup_definitions: Iterable[ReadableDSDLFile], definition_visitors: Iterable[DefinitionVisitor], print_output_handler: Callable[[int, str], None], allow_unregulated_fixed_port_id: bool, @@ -228,10 +229,10 @@ def read( raise InternalError(culprit=ex, path=self.file_path) from ex # +-----------------------------------------------------------------------+ - # | DsdlFile :: INTERFACE | + # | DSDLFile :: INTERFACE | # +-----------------------------------------------------------------------+ @property - def composite_type(self) -> Optional[CompositeType]: + def composite_type(self) -> CompositeType | None: return self._cached_type @property @@ -239,7 +240,7 @@ def full_name(self) -> str: return self._name @property - def name_components(self) -> List[str]: + def name_components(self) -> list[str]: return self._name.split(CompositeType.NAME_COMPONENT_SEPARATOR) @property @@ -266,7 +267,7 @@ def version(self) -> Version: return self._version @property - def fixed_port_id(self) -> Optional[int]: + def fixed_port_id(self) -> int | None: return self._fixed_port_id @property @@ -317,7 +318,7 @@ def _unittest_dsdl_definition_read_non_existent() -> None: target_definition = DSDLDefinition(target, target.parent) def print_output(line_number: int, text: str) -> None: # pragma: no cover - pass + _ = line_number, text with expect_raises(InvalidDefinitionError): target_definition.read([], [], print_output, True) @@ -330,7 +331,7 @@ def _unittest_dsdl_definition_read_text(temp_dsdl_factory) -> None: # type: ign target_file_path = Path(target_root / "Target.1.1.dsdl") dsdl_file = temp_dsdl_factory.new_file(target_root / target_file_path, "@sealed") with expect_raises(ValueError): - target_definition = DSDLDefinition(dsdl_file, target_root) + _target_definition = DSDLDefinition(dsdl_file, target_root) # we test first that we can't create the object until we have a target_root that contains the dsdl_file target_definition = DSDLDefinition(dsdl_file, dsdl_file.parent.parent) @@ -358,7 +359,7 @@ def _unittest_type_from_path_inference() -> None: # The root namespace is not inferred in an absolute path without additional data: - with expect_raises(DsdlPathInferenceError): + with expect_raises(PathInferenceError): _ = DSDLDefinition._infer_path_to_root_from_first_found( Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl").resolve(), [] ) @@ -372,7 +373,7 @@ def _unittest_type_from_path_inference() -> None: # latter: # dsdl file path is not contained within the root path - with expect_raises(DsdlPathInferenceError): + with expect_raises(PathInferenceError): _ = DSDLDefinition._infer_path_to_root_from_first_found( Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl").resolve(), [Path("/not-a-repo").resolve()] ) @@ -411,18 +412,18 @@ def _unittest_type_from_path_inference() -> None: assert root == Path("repo/uavcan") # absolute dsdl path using valid roots but an invalid file path - with expect_raises(DsdlPathInferenceError): + with expect_raises(PathInferenceError): _ = DSDLDefinition._infer_path_to_root_from_first_found( Path("/repo/crap/foo/bar/435.baz.1.0.dsdl").resolve(), valid_roots ) # relative dsdl path using valid roots but an invalid file path - with expect_raises(DsdlPathInferenceError): + with expect_raises(PathInferenceError): _ = DSDLDefinition._infer_path_to_root_from_first_found(Path("repo/crap/foo/bar/435.baz.1.0.dsdl"), valid_roots) # relative dsdl path with invalid root fragments invalid_root_fragments = [Path("cyphal", "acme")] - with expect_raises(DsdlPathInferenceError): + with expect_raises(PathInferenceError): _ = DSDLDefinition._infer_path_to_root_from_first_found( Path("repo/crap/foo/bar/435.baz.1.0.dsdl"), invalid_root_fragments ) diff --git a/pydsdl/_namespace.py b/pydsdl/_namespace.py index f142539..065c739 100644 --- a/pydsdl/_namespace.py +++ b/pydsdl/_namespace.py @@ -4,17 +4,18 @@ # pylint: disable=logging-not-lazy +from __future__ import annotations import collections import logging from itertools import product, repeat from pathlib import Path -from typing import Callable, DefaultDict, Dict, Iterable, List, Optional, Set, Tuple, Union +from typing import Callable, DefaultDict, Iterable from . import _dsdl_definition, _error, _serializable -from ._dsdl import DsdlFileBuildable, PrintOutputHandler, SortedFileList +from ._dsdl import ReadableDSDLFile, PrintOutputHandler, SortedFileList from ._dsdl import file_sort as dsdl_file_sort from ._dsdl import normalize_paths_argument_to_list -from ._namespace_reader import DsdlDefinitions, read_definitions +from ._namespace_reader import DSDLDefinitions, read_definitions _logger = logging.getLogger(__name__) @@ -79,12 +80,12 @@ class SealingConsistencyError(_error.InvalidDefinitionError): def read_namespace( - root_namespace_directory: Union[Path, str], - lookup_directories: Union[None, Path, str, Iterable[Union[Path, str]]] = None, - print_output_handler: Optional[PrintOutputHandler] = None, + root_namespace_directory: Path | str, + lookup_directories: None | Path | str | Iterable[Path | str] = None, + print_output_handler: PrintOutputHandler | None = None, allow_unregulated_fixed_port_id: bool = False, allow_root_namespace_name_collision: bool = True, -) -> List[_serializable.CompositeType]: +) -> list[_serializable.CompositeType]: """ This function is a main entry point for the library. It reads all DSDL definitions from the specified root namespace directory and produces the annotated AST. @@ -147,12 +148,12 @@ def read_namespace( # pylint: disable=too-many-arguments def read_files( - dsdl_files: Union[None, Path, str, Iterable[Union[Path, str]]], - root_namespace_directories_or_names: Union[None, Path, str, Iterable[Union[Path, str]]], - lookup_directories: Union[None, Path, str, Iterable[Union[Path, str]]] = None, - print_output_handler: Optional[PrintOutputHandler] = None, + dsdl_files: None | Path | str | Iterable[Path | str], + root_namespace_directories_or_names: None | Path | str | Iterable[Path | str], + lookup_directories: None | Path | str | Iterable[Path | str] = None, + print_output_handler: PrintOutputHandler | None = None, allow_unregulated_fixed_port_id: bool = False, -) -> Tuple[List[_serializable.CompositeType], List[_serializable.CompositeType]]: +) -> tuple[list[_serializable.CompositeType], list[_serializable.CompositeType]]: """ This function is a main entry point for the library. It reads all DSDL definitions from the specified ``dsdl_files`` and produces the annotated AST for these types and @@ -251,11 +252,11 @@ def read_files( def _complete_read_function( - target_dsdl_definitions: SortedFileList[DsdlFileBuildable], - lookup_directories_path_list: List[Path], - print_output_handler: Optional[PrintOutputHandler], + target_dsdl_definitions: SortedFileList[ReadableDSDLFile], + lookup_directories_path_list: list[Path], + print_output_handler: PrintOutputHandler | None, allow_unregulated_fixed_port_id: bool, -) -> DsdlDefinitions: +) -> DSDLDefinitions: lookup_dsdl_definitions = _construct_dsdl_definitions_from_namespaces(lookup_directories_path_list) @@ -295,9 +296,9 @@ def _complete_read_function( def _construct_lookup_directories_path_list( root_namespace_directories: Iterable[Path], - lookup_directories_path_list: List[Path], + lookup_directories_path_list: list[Path], allow_root_namespace_name_collision: bool, -) -> List[Path]: +) -> list[Path]: """ Intermediate transformation and validation of inputs into a list of lookup directories as paths. @@ -342,11 +343,11 @@ def _construct_lookup_directories_path_list( def _construct_dsdl_definitions_from_files( - dsdl_files: List[Path], - valid_roots: List[Path], -) -> SortedFileList[DsdlFileBuildable]: + dsdl_files: list[Path], + valid_roots: list[Path], +) -> SortedFileList[ReadableDSDLFile]: """ """ - output = set() # type: Set[DsdlFileBuildable] + output = set() # type: set[ReadableDSDLFile] for fp in dsdl_files: resolved_fp = fp.resolve(strict=False) if resolved_fp.suffix == DSDL_FILE_SUFFIX_LEGACY: @@ -362,14 +363,14 @@ def _construct_dsdl_definitions_from_files( def _construct_dsdl_definitions_from_namespaces( - root_namespace_paths: List[Path], -) -> SortedFileList[DsdlFileBuildable]: + root_namespace_paths: list[Path], +) -> SortedFileList[ReadableDSDLFile]: """ Accepts a directory path, returns a sorted list of abstract DSDL file representations. Those can be read later. The definitions are sorted by name lexicographically, then by major version (greatest version first), then by minor version (same ordering as the major version). """ - source_file_paths: Set[Tuple[Path, Path]] = set() # index of all file paths already found + source_file_paths: set[tuple[Path, Path]] = set() # index of all file paths already found for root_namespace_path in root_namespace_paths: for p in root_namespace_path.rglob(DSDL_FILE_GLOB): source_file_paths.add((p, root_namespace_path)) @@ -386,8 +387,8 @@ def _construct_dsdl_definitions_from_namespaces( def _ensure_no_collisions( - target_definitions: List[DsdlFileBuildable], - lookup_definitions: List[DsdlFileBuildable], + target_definitions: list[ReadableDSDLFile], + lookup_definitions: list[ReadableDSDLFile], ) -> None: for tg in target_definitions: tg_full_namespace_period = tg.full_namespace.lower() + "." @@ -423,7 +424,7 @@ def _ensure_no_collisions( raise DataTypeCollisionError("This type is redefined in %s" % lu.file_path, path=tg.file_path) -def _ensure_no_fixed_port_id_collisions(types: List[_serializable.CompositeType]) -> None: +def _ensure_no_fixed_port_id_collisions(types: list[_serializable.CompositeType]) -> None: for a in types: for b in types: different_names = a.full_name != b.full_name @@ -444,13 +445,13 @@ def _ensure_no_fixed_port_id_collisions(types: List[_serializable.CompositeType] ) -def _ensure_minor_version_compatibility(types: List[_serializable.CompositeType]) -> None: - by_name = collections.defaultdict(list) # type: DefaultDict[str, List[_serializable.CompositeType]] +def _ensure_minor_version_compatibility(types: list[_serializable.CompositeType]) -> None: + by_name = collections.defaultdict(list) # type: DefaultDict[str, list[_serializable.CompositeType]] for t in types: by_name[t.full_name].append(t) for definitions in by_name.values(): - by_major = collections.defaultdict(list) # type: DefaultDict[int, List[_serializable.CompositeType]] + by_major = collections.defaultdict(list) # type: DefaultDict[int, list[_serializable.CompositeType]] for t in definitions: by_major[t.version.major].append(t) @@ -557,7 +558,7 @@ def _ensure_no_namespace_name_collisions_or_nested_root_namespaces( ) -> None: directories = {x.resolve() for x in directories} # normalize the case in case-insensitive filesystems - def check_each(path_tuple_with_result: Tuple[Tuple[Path, Path], List[int]]) -> bool: + def check_each(path_tuple_with_result: tuple[tuple[Path, Path], list[int]]) -> bool: path_tuple = path_tuple_with_result[0] if not path_tuple[0].samefile(path_tuple[1]): if not allow_name_collisions and path_tuple[0].name.lower() == path_tuple[1].name.lower(): @@ -610,7 +611,7 @@ def _unittest_dsdl_definition_constructor() -> None: dsdl_defs = _construct_dsdl_definitions_from_namespaces([root]) print(dsdl_defs) - lut = {x.full_name: x for x in dsdl_defs} # type: Dict[str, DsdlFileBuildable] + lut = {x.full_name: x for x in dsdl_defs} # type: dict[str, ReadableDSDLFile] assert len(lut) == 3 assert str(lut["foo.Qwerty"]) == repr(lut["foo.Qwerty"]) @@ -720,7 +721,7 @@ def _unittest_dsdl_definition_constructor_legacy() -> None: (root / "123.Qwerty.123.234.uavcan").write_text("# TEST A") dsdl_defs = _construct_dsdl_definitions_from_namespaces([root]) print(dsdl_defs) - lut = {x.full_name: x for x in dsdl_defs} # type: Dict[str, DsdlFileBuildable] + lut = {x.full_name: x for x in dsdl_defs} # type: dict[str, ReadableDSDLFile] assert len(lut) == 1 t = lut["foo.Qwerty"] assert t.file_path == root / "123.Qwerty.123.234.uavcan" @@ -743,7 +744,7 @@ def _unittest_common_usage_errors() -> None: root_ns_dir = di / "foo" root_ns_dir.mkdir() - reports = [] # type: List[str] + reports = [] # type: list[str] _ensure_no_common_usage_errors([root_ns_dir], [], reports.append) assert not reports @@ -803,7 +804,6 @@ def _unittest_issue_71() -> None: # https://github.com/OpenCyphal/pydsdl/issues def _unittest_type_read_files_example(temp_dsdl_factory) -> None: # type: ignore - # let's test the comments for the read function dsdl_files = [ Path("workspace/project/types/animals/felines/Tabby.1.0.uavcan"), # keep .uavcan to cover the warning @@ -867,6 +867,8 @@ def _unittest_read_files_empty_args() -> None: def _unittest_ensure_no_collisions(temp_dsdl_factory) -> None: # type: ignore from pytest import raises as expect_raises + _ = temp_dsdl_factory + # gratuitous coverage of the collision check where other tests don't cover some edge cases _ensure_no_namespace_name_collisions_or_nested_root_namespaces([], False) diff --git a/pydsdl/_namespace_reader.py b/pydsdl/_namespace_reader.py index 7295e0f..adb5130 100644 --- a/pydsdl/_namespace_reader.py +++ b/pydsdl/_namespace_reader.py @@ -2,13 +2,14 @@ # Copyright Amazon.com Inc. or its affiliates. # SPDX-License-Identifier: MIT - +from __future__ import annotations import functools import logging +from dataclasses import dataclass from pathlib import Path -from typing import Dict, List, NamedTuple, Optional, Set, cast +from typing import cast -from ._dsdl import DefinitionVisitor, DsdlFile, DsdlFileBuildable, PrintOutputHandler, SortedFileList +from ._dsdl import DefinitionVisitor, DSDLFile, ReadableDSDLFile, PrintOutputHandler, SortedFileList from ._dsdl import file_sort as dsdl_file_sort from ._error import FrontendError, InternalError from ._serializable._composite import CompositeType @@ -16,13 +17,13 @@ # pylint: disable=too-many-arguments def _read_definitions( - target_definitions: SortedFileList[DsdlFileBuildable], - lookup_definitions: SortedFileList[DsdlFileBuildable], - print_output_handler: Optional[PrintOutputHandler], + target_definitions: SortedFileList[ReadableDSDLFile], + lookup_definitions: SortedFileList[ReadableDSDLFile], + print_output_handler: PrintOutputHandler | None, allow_unregulated_fixed_port_id: bool, - direct: Set[CompositeType], - transitive: Set[CompositeType], - file_pool: Dict[Path, DsdlFileBuildable], + direct: set[CompositeType], + transitive: set[CompositeType], + file_pool: dict[Path, ReadableDSDLFile], level: int, ) -> None: """ @@ -30,10 +31,10 @@ def _read_definitions( (recursive method with a lot of arguments. See read_definitions for documentation) """ - _pending_definitions: Set[DsdlFileBuildable] = set() + _pending_definitions: set[ReadableDSDLFile] = set() class _Callback(DefinitionVisitor): - def on_definition(self, _: DsdlFile, dependency_dsdl_file: DsdlFileBuildable) -> None: + def on_definition(self, _: DSDLFile, dependency_dsdl_file: ReadableDSDLFile) -> None: if dependency_dsdl_file.file_path not in file_pool: _pending_definitions.add(dependency_dsdl_file) @@ -43,8 +44,8 @@ def print_handler(file: Path, line: int, message: str) -> None: for target_definition in target_definitions: - if not isinstance(target_definition, DsdlFileBuildable): - raise TypeError("Expected DsdlFileBuildable, got: " + type(target_definition).__name__) + if not isinstance(target_definition, ReadableDSDLFile): + raise TypeError("Expected ReadableDSDLFile, got: " + type(target_definition).__name__) target_definition = file_pool.setdefault(target_definition.file_path, target_definition) # make sure we are working with the same object for a given file path @@ -98,21 +99,24 @@ def print_handler(file: Path, line: int, message: str) -> None: # +---[FILE: PUBLIC]--------------------------------------------------------------------------------------------------+ -DsdlDefinitions = NamedTuple( - "DsdlDefinitions", [("direct", SortedFileList[CompositeType]), ("transitive", SortedFileList[CompositeType])] -) -""" -Common DSDL definition set including the direct dependencies requested and the transitive dependencies found. The former -and latter sets will be disjoint. -""" + +@dataclass(frozen=True) +class DSDLDefinitions: + """ + Common DSDL definition set including the direct dependencies requested and the transitive dependencies found. + The former and latter sets will be disjoint. + """ + + direct: SortedFileList[CompositeType] + transitive: SortedFileList[CompositeType] def read_definitions( - target_definitions: SortedFileList[DsdlFileBuildable], - lookup_definitions: SortedFileList[DsdlFileBuildable], - print_output_handler: Optional[PrintOutputHandler], + target_definitions: SortedFileList[ReadableDSDLFile], + lookup_definitions: SortedFileList[ReadableDSDLFile], + print_output_handler: PrintOutputHandler | None, allow_unregulated_fixed_port_id: bool, -) -> DsdlDefinitions: +) -> DSDLDefinitions: """ Given a set of DSDL files, this method reads the text and invokes the parser for each and for any files found in the lookup set where these are used by the target set. @@ -125,9 +129,9 @@ def read_definitions( :raises InvalidDefinitionError: If a dependency is missing. :raises InternalError: If an unexpected error occurs. """ - _direct: Set[CompositeType] = set() - _transitive: Set[CompositeType] = set() - _file_pool: Dict[Path, DsdlFileBuildable] = {} + _direct: set[CompositeType] = set() + _transitive: set[CompositeType] = set() + _file_pool: dict[Path, ReadableDSDLFile] = {} _read_definitions( target_definitions, lookup_definitions, @@ -138,7 +142,7 @@ def read_definitions( _file_pool, 0, ) - return DsdlDefinitions( + return DSDLDefinitions( dsdl_file_sort(_direct), dsdl_file_sort(_transitive), ) @@ -151,8 +155,8 @@ def _unittest_namespace_reader_read_definitions(temp_dsdl_factory) -> None: # t from . import _dsdl_definition target = temp_dsdl_factory.new_file(Path("root", "ns", "Target.1.1.dsdl"), "@sealed") - target_definitions = [cast(DsdlFileBuildable, _dsdl_definition.DSDLDefinition(target, target.parent))] - lookup_definitions: List[DsdlFileBuildable] = [] + target_definitions = [cast(ReadableDSDLFile, _dsdl_definition.DSDLDefinition(target, target.parent))] + lookup_definitions: list[ReadableDSDLFile] = [] read_definitions(target_definitions, lookup_definitions, None, True) @@ -203,8 +207,8 @@ def _unittest_namespace_reader_read_definitions_multiple_no_load(temp_dsdl_facto # never be read thus it will not be an error that it does not exist. ] - target_definitions = [cast(DsdlFileBuildable, _dsdl_definition.DSDLDefinition(t, t.parent)) for t in targets] - lookup_definitions = [cast(DsdlFileBuildable, _dsdl_definition.DSDLDefinition(a, a.parent)) for a in dependencies] + target_definitions = [cast(ReadableDSDLFile, _dsdl_definition.DSDLDefinition(t, t.parent)) for t in targets] + lookup_definitions = [cast(ReadableDSDLFile, _dsdl_definition.DSDLDefinition(a, a.parent)) for a in dependencies] _ = read_definitions( target_definitions, lookup_definitions, @@ -377,11 +381,8 @@ def _unittest_namespace_reader_read_defs_target_dont_allow_unregulated(temp_dsdl def _unittest_namespace_reader_type_error() -> None: - from pytest import raises as assert_raises - from . import _dsdl_definition - with assert_raises(TypeError): read_definitions( [""], # type: ignore diff --git a/pydsdl/_parser.py b/pydsdl/_parser.py index 4eda81a..cc25631 100644 --- a/pydsdl/_parser.py +++ b/pydsdl/_parser.py @@ -2,13 +2,14 @@ # This software is distributed under the terms of the MIT License. # Author: Pavel Kirienko +from __future__ import annotations import typing import logging import itertools import functools import fractions from pathlib import Path -from typing import cast, List, Tuple +from typing import cast, Tuple import parsimonious from parsimonious.nodes import Node as _Node from . import _error @@ -160,6 +161,7 @@ def generic_visit(self, node: _Node, visited_children: typing.Sequence[typing.An return tuple(visited_children) or node def visit_line(self, node: _Node, children: _Children) -> None: + _ = children if len(node.text) == 0: # Line is empty, flush comment self._flush_comment() @@ -174,6 +176,7 @@ def visit_end_of_line(self, _n: _Node, _c: _Children) -> None: visit_statement_directive = _make_typesafe_child_lifter(type(None)) # nodes are above the top level. def visit_comment(self, node: _Node, children: _Children) -> None: + _ = children assert isinstance(node.text, str) self._comment += "\n" if self._comment != "" else "" self._comment += node.text[2:] if node.text.startswith("# ") else node.text[1:] @@ -302,7 +305,7 @@ def visit_type_bit_length_suffix(self, node: _Node, _c: _Children) -> int: visit_op2_exp = parsimonious.NodeVisitor.lift_child def visit_expression_list(self, _n: _Node, children: _Children) -> Tuple[_expression.Any, ...]: - out = [] # type: List[_expression.Any] + out = [] # type: list[_expression.Any] if children: children = children[0] assert len(children) == 2 diff --git a/pydsdl/_test.py b/pydsdl/_test.py index ca9c373..d3dbecb 100644 --- a/pydsdl/_test.py +++ b/pydsdl/_test.py @@ -5,8 +5,9 @@ # cSpell: words iceb # pylint: disable=global-statement,protected-access,too-many-statements,consider-using-with,redefined-outer-name +from __future__ import annotations import tempfile -from typing import Union, Tuple, Optional, Sequence, Type, Iterable +from typing import Sequence, Type, Iterable from pathlib import Path from textwrap import dedent import pytest # This is only safe to import in test files! @@ -29,7 +30,7 @@ def __init__(self) -> None: def directory(self) -> Path: return Path(self._tmp_dir.name) - def new(self, rel_path: Union[str, Path], text: str) -> None: + def new(self, rel_path: str | Path, text: str) -> None: """ Simply creates a new DSDL source file with the given contents at the specified path inside the workspace. """ @@ -38,7 +39,7 @@ def new(self, rel_path: Union[str, Path], text: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(text, encoding="utf8") - def parse_new(self, rel_path: Union[str, Path], text: str) -> _dsdl_definition.DSDLDefinition: + def parse_new(self, rel_path: str | Path, text: str) -> _dsdl_definition.DSDLDefinition: """ Creates a new DSDL source file with the given contents at the specified path inside the workspace, then parses it and returns the resulting definition object. @@ -747,7 +748,7 @@ def standalone(rel_path: str, definition: str, allow_unregulated: bool = False) def _unittest_print(wrkspc: Workspace) -> None: - printed_items = None # type: Optional[Tuple[int, str]] + printed_items = None # type: tuple[int, str] | None def print_handler(line_number: int, text: str) -> None: nonlocal printed_items @@ -991,7 +992,7 @@ def _unittest_assert(wrkspc: Workspace) -> None: def _unittest_parse_namespace(wrkspc: Workspace) -> None: from pytest import raises - print_output = None # type: Optional[Tuple[str, int, str]] + print_output = None # type: tuple[str, int, str] | None def print_handler(d: Path, line: int, text: str) -> None: nonlocal print_output