|
| 1 | +"""Classes and functions for populating a validation context.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +import itertools |
| 6 | +import json |
| 7 | +from functools import cache |
| 8 | + |
| 9 | +import attrs |
| 10 | +from bidsschematools.types import Namespace |
| 11 | +from bidsschematools.types import context as ctx |
| 12 | +from upath import UPath |
| 13 | + |
| 14 | +from .types import _typings as t |
| 15 | +from .types.files import FileTree |
| 16 | + |
| 17 | +TYPE_CHECKING = False |
| 18 | +if TYPE_CHECKING: |
| 19 | + from collections.abc import Generator |
| 20 | + |
| 21 | + from bidsschematools.types import protocols as proto |
| 22 | + |
| 23 | + # pyright does not treat cached_property like property |
| 24 | + cached_property = property |
| 25 | +else: |
| 26 | + from functools import cached_property |
| 27 | + |
| 28 | + |
| 29 | +# Design strategy for these classes. |
| 30 | +# |
| 31 | +# * Attribute access is essential. Dictionary lookups must become either |
| 32 | +# dataclasses or Namespace-like objects. |
| 33 | +# * Objects should have a single source of truth, such as a FileTree object. |
| 34 | +# When defining derived fields, prefer @property if the lookup is trivial |
| 35 | +# or cached_property, if it is nontrivial. |
| 36 | +# * Hitting the filesystem should be minimized. FileTree already caches os.stat calls, |
| 37 | +# but reading file contents should never happen more than once. Therefore, if file |
| 38 | +# contents are accessed in the context.associations or context.dataset, they should |
| 39 | +# use caching loaders. |
| 40 | +# * If the full contents of an object will be known on its first instantiation, |
| 41 | +# prefer to use the dataclasses in bidsschematools.types.context. Lazy fields |
| 42 | +# need custom classes. |
| 43 | + |
| 44 | + |
| 45 | +class ValidationError(Exception): |
| 46 | + """TODO: Add issue structure.""" |
| 47 | + |
| 48 | + |
| 49 | +@cache |
| 50 | +def load_tsv(file: FileTree, *, max_rows=0) -> Namespace: |
| 51 | + """Load TSV contents into a Namespace.""" |
| 52 | + with open(file) as fobj: |
| 53 | + if max_rows > 0: |
| 54 | + fobj = itertools.islice(fobj, max_rows) |
| 55 | + contents = (line.rstrip('\r\n').split('\t') for line in fobj) |
| 56 | + # Extract headers then transpose rows to columns |
| 57 | + return Namespace(zip(next(contents), zip(*contents))) |
| 58 | + |
| 59 | + |
| 60 | +@cache |
| 61 | +def load_json(file: FileTree) -> dict[str]: |
| 62 | + """Load JSON file contents.""" |
| 63 | + with open(file) as fobj: |
| 64 | + return json.load(fobj) |
| 65 | + |
| 66 | + |
| 67 | +class Subjects: |
| 68 | + """Collections of subjects in the dataset.""" |
| 69 | + |
| 70 | + def __init__(self, tree: FileTree): |
| 71 | + self._tree = tree |
| 72 | + |
| 73 | + @cached_property |
| 74 | + def sub_dirs(self) -> list[str]: |
| 75 | + """Subjects as determined by sub-* directories.""" |
| 76 | + return [ |
| 77 | + child.name |
| 78 | + for child in self._tree.children.values() |
| 79 | + if child.is_dir and child.name.startswith('sub-') |
| 80 | + ] |
| 81 | + |
| 82 | + @property |
| 83 | + def participant_id(self) -> list[str] | None: |
| 84 | + """The participant_id column of participants.tsv.""" |
| 85 | + if 'participants.tsv' not in self._tree.children: |
| 86 | + return None |
| 87 | + |
| 88 | + return self._get_participant_id(self._tree.children['participants.tsv']) |
| 89 | + |
| 90 | + @cached_property |
| 91 | + def phenotype(self) -> list[str] | None: |
| 92 | + """The union of participant_id columns in phenotype files.""" |
| 93 | + if 'phenotype' not in self._tree.children: |
| 94 | + return None |
| 95 | + |
| 96 | + subjects = set() |
| 97 | + for phenotype_file in self._tree.children['phenotype'].children: |
| 98 | + if phenotype_file.name.endswith('.tsv'): |
| 99 | + subjects.update(self._get_participant_id(phenotype_file)) |
| 100 | + |
| 101 | + return sorted(subjects) |
| 102 | + |
| 103 | + @staticmethod |
| 104 | + def _get_participant_id(phenotype_file: FileTree) -> list[str] | None: |
| 105 | + columns = load_tsv(phenotype_file) |
| 106 | + if 'participant_id' not in columns: |
| 107 | + return None |
| 108 | + return list(columns['participant_id']) |
| 109 | + |
| 110 | + |
| 111 | +@attrs.define |
| 112 | +class Dataset: |
| 113 | + """A dataset object that loads properties on first access.""" |
| 114 | + |
| 115 | + tree: FileTree |
| 116 | + ignored: list[str] = attrs.field(factory=list) |
| 117 | + subjects: Subjects = attrs.field(init=False) |
| 118 | + |
| 119 | + def __attrs_post_init__(self): |
| 120 | + self.subjects = Subjects(self.tree) |
| 121 | + |
| 122 | + @cached_property |
| 123 | + def dataset_description(self) -> Namespace: |
| 124 | + """Contents of '/dataset_description.json'.""" |
| 125 | + return Namespace.from_json( |
| 126 | + UPath(self.tree.children['dataset_description.json']).read_text() |
| 127 | + ) |
| 128 | + |
| 129 | + @cached_property |
| 130 | + def modalities(self) -> list[str]: |
| 131 | + """List of modalities found in the dataset.""" |
| 132 | + ... |
| 133 | + return [] |
| 134 | + |
| 135 | + @cached_property |
| 136 | + def datatypes(self) -> list[str]: |
| 137 | + """List of datatypes found in the dataset.""" |
| 138 | + ... |
| 139 | + return [] |
| 140 | + |
| 141 | + |
| 142 | +@attrs.define |
| 143 | +class Association: |
| 144 | + """Generic association, exposing the associated file's path.""" |
| 145 | + |
| 146 | + _file: FileTree |
| 147 | + |
| 148 | + @property |
| 149 | + def path(self): |
| 150 | + """Dataset-relative path of the associated file.""" |
| 151 | + return self._file.relative_path |
| 152 | + |
| 153 | + |
| 154 | +def load_file(file: FileTree, dataset: proto.Dataset) -> ctx.Context: |
| 155 | + """Load a full context for a given file.""" |
| 156 | + associations = load_associations(file, dataset) |
| 157 | + _ = associations |
| 158 | + |
| 159 | + |
| 160 | +def load_associations(file: FileTree, dataset: proto.Dataset) -> ctx.Associations: |
| 161 | + """Load all associations for a given file.""" |
| 162 | + # If something fails, return None. |
| 163 | + # Uses walk back algorithm |
| 164 | + # https://bids-validator.readthedocs.io/en/latest/validation-model/inheritance-principle.html |
| 165 | + # Stops on first success |
| 166 | + |
| 167 | + |
| 168 | +def load_events(file: FileTree) -> ctx.Events: |
| 169 | + """Load events.tsv file.""" |
| 170 | + |
| 171 | + |
| 172 | +def load_sidecar(file: FileTree) -> dict[str, t.Any]: |
| 173 | + """Load sidecar metadata, using the inheritance principle.""" |
| 174 | + # Uses walk back algorithm |
| 175 | + # https://bids-validator.readthedocs.io/en/latest/validation-model/inheritance-principle.html |
| 176 | + # Accumulates all sidecars |
| 177 | + |
| 178 | + |
| 179 | +def walk_back( |
| 180 | + source: FileTree, |
| 181 | + inherit: bool, |
| 182 | + target_extensions: tuple[str, ...] = ('.json',), |
| 183 | + target_suffix: str | None = None, |
| 184 | + target_entities: tuple[str, ...] = (), |
| 185 | +) -> Generator[FileTree] | Generator[list[FileTree, ...]]: |
| 186 | + """Walk up the file tree to find associated files.""" |
| 187 | + for file_group in _walk_back( |
| 188 | + source, inherit, target_extensions, target_suffix, target_entities |
| 189 | + ): |
| 190 | + if target_entities: |
| 191 | + yield file_group |
| 192 | + elif len(file_group) == 1: |
| 193 | + yield file_group[0] |
| 194 | + else: |
| 195 | + raise ValidationError('Multiple matching files.') |
| 196 | + |
| 197 | + |
| 198 | +def _walk_back( |
| 199 | + source: FileTree, |
| 200 | + inherit: bool, |
| 201 | + target_extensions: tuple[str, ...], |
| 202 | + target_suffix: str | None, |
| 203 | + target_entities: tuple[str, ...], |
| 204 | +) -> Generator[list[FileTree, ...]]: |
| 205 | + file_parts = FileParts.from_file(source.relative_path) |
| 206 | + |
| 207 | + if target_suffix is None: |
| 208 | + target_suffix = file_parts.suffix |
| 209 | + |
| 210 | + tree = source.parent |
| 211 | + while tree: |
| 212 | + matches = [] |
| 213 | + for child in tree.children: |
| 214 | + if child.is_dir: |
| 215 | + continue |
| 216 | + parts = FileParts.from_file(child.relative_path) |
| 217 | + if parts.extension != target_extensions: |
| 218 | + continue |
| 219 | + if parts.suffix != target_suffix: |
| 220 | + continue |
| 221 | + if all( |
| 222 | + key in target_entities or file_parts.entities.get(key) == value |
| 223 | + for key, value in parts.entities.items() |
| 224 | + ): |
| 225 | + matches.append(child) |
| 226 | + |
| 227 | + yield matches |
| 228 | + if not inherit: |
| 229 | + break |
| 230 | + tree = tree.parent |
| 231 | + |
| 232 | + |
| 233 | +@attrs.define |
| 234 | +class FileParts: |
| 235 | + """BIDS-relevant components of a file path.""" |
| 236 | + |
| 237 | + path: str |
| 238 | + stem: str |
| 239 | + entities: dict[str, str | None] |
| 240 | + datatype: str | None |
| 241 | + suffix: str | None |
| 242 | + extension: str | None |
| 243 | + |
| 244 | + @classmethod |
| 245 | + def from_file(cls, file: FileTree, schema: Namespace) -> t.Self: |
| 246 | + """Parse file parts from FileTree object.""" |
| 247 | + stem, _, extension = file.name.partition('.') |
| 248 | + |
| 249 | + if extension: |
| 250 | + extension = f'.{extension}' |
| 251 | + if file.is_dir: |
| 252 | + extension = f'{extension}/' |
| 253 | + |
| 254 | + datatype = None |
| 255 | + if file.parent: |
| 256 | + if any(file.parent.name == dtype.value for dtype in schema.objects.datatypes.values()): |
| 257 | + datatype = file.parent.name |
| 258 | + |
| 259 | + *entity_strings, suffix = stem.split('_') |
| 260 | + entities = { |
| 261 | + key: vals[0] if vals else None |
| 262 | + for key, *vals in (string.split('-', 1) for string in entity_strings) |
| 263 | + } |
| 264 | + |
| 265 | + return cls( |
| 266 | + path=f'/{file.relative_path}', |
| 267 | + stem=stem, |
| 268 | + entities=entities, |
| 269 | + datatype=datatype, |
| 270 | + suffix=suffix, |
| 271 | + extension=extension, |
| 272 | + ) |
0 commit comments