Skip to content

Commit

Permalink
Allow relative target paths
Browse files Browse the repository at this point in the history
Fix for issue OpenCyphal#109. This relaxes some logic that required existing paths before creating a DSDLFile abstraction while adding a requirement that ReadableDSDLFile(s) are created with files that exist (i.e. it can't be "readable" if it doesn't exist). At the same time, we clarified (and fixed) the difference between the `root_namespace_directories_or_names` and `lookup_directories arguments` of read_files. Our new guidance is to dis-use `lookup_directories` unless there is a need to separate target lookup paths from dependent type look up paths. This was an unstated design goal that I forgot about and subsequently mis-implemented. This does change the behaviour of the API slightly which is why I've bumped the minor version instead of just the patch.
  • Loading branch information
thirtytwobits committed Jul 16, 2024
1 parent c0afc34 commit 22e5ea2
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 77 deletions.
2 changes: 1 addition & 1 deletion pydsdl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import sys as _sys
from pathlib import Path as _Path

__version__ = "1.21.1"
__version__ = "1.22.0"
__version_info__ = tuple(map(int, __version__.split(".")[:3]))
__license__ = "MIT"
__author__ = "OpenCyphal"
Expand Down
18 changes: 17 additions & 1 deletion pydsdl/_dsdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
# SPDX-License-Identifier: MIT

from __future__ import annotations

from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Callable, Iterable, TypeVar, List, Tuple
from typing import Any, Callable, Iterable, List, Tuple, TypeVar

from ._error import InvalidDefinitionError
from ._serializable import CompositeType, Version

PrintOutputHandler = Callable[[Path, int, str], None]
Expand Down Expand Up @@ -102,8 +104,22 @@ def root_namespace_path(self) -> Path:
class ReadableDSDLFile(DSDLFile):
"""
A DSDL file that can construct a composite type from its contents.
:param dsdl_path: The path to the alleged DSDL file. This file must exist for readable objects.
:raises InvalidDefinitionError: If the file does not exist.
"""

def __init__(self, file_path: Path, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._file_path = file_path
if not self.file_path.exists():
raise InvalidDefinitionError(
"Attempt to construct ReadableDSDLFile object for file that doesn't exist.", self.file_path
)

@property
def file_path(self) -> Path:
return self._file_path

@abstractmethod
def read(
self,
Expand Down
137 changes: 80 additions & 57 deletions pydsdl/_dsdl_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,20 @@ def _infer_path_to_root_from_first_found(cls, dsdl_path: Path, valid_dsdl_roots:
if valid_dsdl_roots is None:
raise ValueError("valid_dsdl_roots was None")

if dsdl_path.is_absolute() and len(valid_dsdl_roots) == 0:
raise PathInferenceError(
f"dsdl_path ({dsdl_path}) is absolute and the provided valid root names are empty. The DSDL root of "
"an absolute path cannot be inferred without this information.",
dsdl_path,
valid_dsdl_roots,
)

if len(valid_dsdl_roots) == 0:
# if we have no valid roots we can only infer the root of the path. We require the path to be relative
# to avoid accidental inferences given that dsdl file trees starting from a filesystem root are rare.
return Path(dsdl_path.parts[0])

# INFERENCE 1: The strongest inference is when the path is relative to a known root.
# if we have no valid roots we can only infer the root of the path.
directly_inferred = Path(dsdl_path.parts[0]).resolve(strict=False)
if not directly_inferred.exists():
raise PathInferenceError(
f"No valid root found in path {str(dsdl_path)} and the inferred root {str(directly_inferred)} "
"does not exist.",
dsdl_path,
valid_dsdl_roots,
)
return directly_inferred

# INFERENCE 1: The easiest inference is when the target path is relative to a known dsdl root. These operations
# should work with pure paths and not require filesystem access.
resolved_dsdl_path = dsdl_path.resolve(strict=False) if dsdl_path.is_absolute() else None
for path_to_root in valid_dsdl_roots:
# First we try the paths as-is...
Expand All @@ -89,7 +89,27 @@ def _infer_path_to_root_from_first_found(cls, dsdl_path: Path, valid_dsdl_roots:
else:
return path_to_root_resolved

# INFERENCE 2: A weaker, but valid inference is when the path is a child of a known root folder name.
# INFERENCE 2: If the target is relative then we can try to find a valid root by looking for the file in the
# root directories. This is a stronger inference than the previous one because it requires the file to exist
# but we do it second because it reads the filesystem.
if not dsdl_path.is_absolute():
for path_to_root in valid_dsdl_roots:
path_to_root_parent = path_to_root
while path_to_root_parent != path_to_root_parent.parent:
# Weld together and check only if the root's last part is the same name as the target's first part.
# yes:
# path/to/root + root/then/Type.1.0.dsdl <- /root == root/
# no:
# path/to/not_root + root/then/Type.1.0.dsdl <- /not_root != root/
if (
path_to_root_parent.parts[-1] == dsdl_path.parts[0]
and (path_to_root_parent.parent / dsdl_path).exists()
):
return path_to_root_parent
path_to_root_parent = path_to_root_parent.parent

# INFERENCE 3: A weaker, but valid inference is when the target path is a child of a known root folder name.
# This is only allowed if dsdl roots are top-level namespace names and not paths.
root_parts = [x.parts[-1] for x in valid_dsdl_roots if len(x.parts) == 1]
parts = list(dsdl_path.parent.parts)
for i, part in list(enumerate(parts)):
Expand All @@ -102,24 +122,31 @@ def _infer_path_to_root_from_first_found(cls, dsdl_path: Path, valid_dsdl_roots:
def from_first_in(cls: Type["DSDLDefinition"], dsdl_path: Path, valid_dsdl_roots: list[Path]) -> "DSDLDefinition":
"""
Creates a DSDLDefinition object by inferring the path to the namespace root of a DSDL file given a set
of valid roots. The logic used prefers an instance of dsdl_path found to exist under a valid root but
will degrade to pure-path string matching if no file is found. Because this logic uses the first root path
that passes one of these two inferences the order of the valid_dsdl_roots list matters.
of valid roots and, if the dsdl path is relative, resolving the dsdl path relative to said roots. The logic used
prefers an instance of `dsdl_path` found to exist under a valid root but will degrade to pure-path string
matching if no file is found (If this does not yield a valid path to an existing dsdl file an exception is
raised). Because this logic uses the first root path that passes one of these two inferences the order of the
valid_dsdl_roots list matters.
:param dsdl_path: The path to the alleged DSDL file.
:param valid_dsdl_roots: The ordered set of valid root names or paths under which the type must reside.
This argument is accepted as a list for ordering but no de-duplication is performed
as the caller is expected to provide a correct set of paths.
:return A new DSDLDefinition object
:raises PathInferenceError: If the namespace root cannot be inferred from the provided information.
:raises InvalidDefinitionError: If the file does not exist.
"""
return cls(dsdl_path, cls._infer_path_to_root_from_first_found(dsdl_path, valid_dsdl_roots))
root_path = cls._infer_path_to_root_from_first_found(dsdl_path, valid_dsdl_roots)
if not dsdl_path.is_absolute():
dsdl_path_resolved = (root_path.parent / dsdl_path).resolve(strict=False)
else:
dsdl_path_resolved = dsdl_path.resolve(strict=False)
return cls(dsdl_path_resolved, root_path)

def __init__(self, file_path: Path, root_namespace_path: Path):
""" """
# Normalizing the path and reading the definition text
self._file_path = Path(file_path)
del file_path
super().__init__(file_path)
self._root_namespace_path = Path(root_namespace_path)
del root_namespace_path
self._text: str | None = None
Expand All @@ -128,7 +155,7 @@ def __init__(self, file_path: Path, root_namespace_path: Path):
if CompositeType.NAME_COMPONENT_SEPARATOR in self._root_namespace_path.name:
raise FileNameFormatError("Invalid namespace name", path=self._root_namespace_path)

relative_path = self._root_namespace_path.name / self._file_path.relative_to(self._root_namespace_path)
relative_path = self._root_namespace_path.name / self.file_path.relative_to(self._root_namespace_path)

# Parsing the basename, e.g., 434.GetTransportStatistics.0.1.dsdl
basename_components = relative_path.name.split(".")[:-1]
Expand All @@ -138,7 +165,7 @@ def __init__(self, file_path: Path, root_namespace_path: Path):
elif len(basename_components) == 3:
short_name, str_major_version, str_minor_version = basename_components
else:
raise FileNameFormatError("Invalid file name", path=self._file_path)
raise FileNameFormatError("Invalid file name", path=self.file_path)

# Parsing the fixed port ID, if specified; None if not
if str_fixed_port_id is not None:
Expand All @@ -150,7 +177,7 @@ def __init__(self, file_path: Path, root_namespace_path: Path):
"Namespaces are defined as directories; putting the namespace name in the file name will not work. "
'For example: "foo/Bar.1.0.dsdl" is OK (where "foo" is a directory); "foo.Bar.1.0.dsdl" is not.'
% str_fixed_port_id,
path=self._file_path,
path=self.file_path,
) from None
else:
self._fixed_port_id = None
Expand All @@ -159,13 +186,13 @@ def __init__(self, file_path: Path, root_namespace_path: Path):
try:
self._version = Version(major=int(str_major_version), minor=int(str_minor_version))
except ValueError:
raise FileNameFormatError("Could not parse the version numbers", path=self._file_path) from None
raise FileNameFormatError("Could not parse the version numbers", path=self.file_path) from None

# Finally, constructing the name
namespace_components = list(relative_path.parent.parts)
for nc in namespace_components:
if CompositeType.NAME_COMPONENT_SEPARATOR in nc:
raise FileNameFormatError(f"Invalid name for namespace component: {nc!r}", path=self._file_path)
raise FileNameFormatError(f"Invalid name for namespace component: {nc!r}", path=self.file_path)
self._name: str = CompositeType.NAME_COMPONENT_SEPARATOR.join(namespace_components + [str(short_name)])

self._cached_type: CompositeType | None = None
Expand All @@ -185,9 +212,6 @@ def read(
_logger.debug("%s: Cache hit", log_prefix)
return self._cached_type

if not self._file_path.exists():
raise InvalidDefinitionError("Attempt to read DSDL file that doesn't exist.", self._file_path)

started_at = time.monotonic()

# Remove the target definition from the lookup list in order to prevent
Expand Down Expand Up @@ -258,7 +282,7 @@ def root_namespace(self) -> str:
@property
def text(self) -> str:
if self._text is None:
with open(self._file_path) as f:
with open(self.file_path) as f:
self._text = str(f.read())
return self._text

Expand All @@ -274,10 +298,6 @@ def fixed_port_id(self) -> int | None:
def has_fixed_port_id(self) -> bool:
return self.fixed_port_id is not None

@property
def file_path(self) -> Path:
return self._file_path

@property
def root_namespace_path(self) -> Path:
return self._root_namespace_path
Expand All @@ -298,12 +318,15 @@ def __eq__(self, other: object) -> bool:
return NotImplemented # pragma: no cover

def __str__(self) -> str:
return "DSDLDefinition(full_name=%r, version=%r, fixed_port_id=%r, file_path=%s)" % (
self.full_name,
self.version,
self.fixed_port_id,
self.file_path,
)
try:
return "DSDLDefinition(full_name=%r, version=%r, fixed_port_id=%r, file_path=%s)" % (
self.full_name,
self.version,
self.fixed_port_id,
self.file_path,
)
except AttributeError: # pragma: no cover
return "DSDLDefinition(UNINITIALIZED)"

__repr__ = __str__

Expand All @@ -315,13 +338,8 @@ def _unittest_dsdl_definition_read_non_existent() -> None:
from pytest import raises as expect_raises

target = Path("root", "ns", "Target.1.1.dsdl")
target_definition = DSDLDefinition(target, target.parent)

def print_output(line_number: int, text: str) -> None: # pragma: no cover
_ = line_number, text

with expect_raises(InvalidDefinitionError):
target_definition.read([], [], print_output, True)
_ = DSDLDefinition(target, target.parent)


def _unittest_dsdl_definition_read_text(temp_dsdl_factory) -> None: # type: ignore
Expand Down Expand Up @@ -354,16 +372,11 @@ def _unittest_type_from_path_inference() -> None:
# case the method assumes that the relative path is the correct and complete namespace of the type:

# relative path
root = DSDLDefinition._infer_path_to_root_from_first_found(Path("uavcan/foo/bar/435.baz.1.0.dsdl"), [])
root = DSDLDefinition._infer_path_to_root_from_first_found(
Path("uavcan/foo/bar/435.baz.1.0.dsdl"), [Path("uavcan")]
)
assert root == Path("uavcan")

# The root namespace is not inferred in an absolute path without additional data:

with expect_raises(PathInferenceError):
_ = DSDLDefinition._infer_path_to_root_from_first_found(
Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl").resolve(), []
)

with expect_raises(ValueError):
_ = DSDLDefinition._infer_path_to_root_from_first_found(
Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl").resolve(), None # type: ignore
Expand Down Expand Up @@ -458,8 +471,18 @@ def _unittest_type_from_path_inference() -> None:
assert root == Path("repo/uavcan")


def _unittest_from_first_in() -> None:
dsdl_def = DSDLDefinition.from_first_in(
Path("/repo/uavcan/foo/bar/435.baz.1.0.dsdl").resolve(), [Path("/repo/uavcan/foo/..").resolve()]
)
def _unittest_type_from_path_inference_edge_case(temp_dsdl_factory) -> None: # type: ignore
"""
Edge case where we target a file where the namespace is under the root path.
"""
# pylint: disable=protected-access

target_file = temp_dsdl_factory.new_file(Path("dsdl_root/Type.1.0.dsdl"), "@sealed")
root = DSDLDefinition._infer_path_to_root_from_first_found(target_file, [])
assert root == Path(target_file.root)


def _unittest_from_first_in(temp_dsdl_factory) -> None: # type: ignore
dsdl_file = temp_dsdl_factory.new_file(Path("repo/uavcan/foo/bar/435.baz.1.0.dsdl"), "@sealed")
dsdl_def = DSDLDefinition.from_first_in(dsdl_file.resolve(), [(dsdl_file.parent.parent / "..")])
assert dsdl_def.full_name == "uavcan.foo.bar.baz"
Loading

0 comments on commit 22e5ea2

Please sign in to comment.