|
| 1 | +import re |
| 2 | +from typing import Annotated |
| 3 | +from uuid import UUID |
| 4 | + |
| 5 | +from pydantic import AfterValidator, Field |
| 6 | + |
| 7 | +__all__ = [ |
| 8 | + "NQN" |
| 9 | +] |
| 10 | + |
| 11 | +MIN_NQN_LEN = 11 |
| 12 | +MAX_NQN_LEN = 223 |
| 13 | + |
| 14 | +NVMET_NQN_UUID_PREFIX = "nqn.2014-08.org.nvmexpress:uuid:" |
| 15 | +NVMET_NQN_UUID_PREFIX_LEN = 32 |
| 16 | +NVMET_DISCOVERY_NQN = "nqn.2014-08.org.nvmexpress.discovery" |
| 17 | +UUID_STRING_LEN = 36 |
| 18 | +NVMET_NQN_UUID_LEN = NVMET_NQN_UUID_PREFIX_LEN + UUID_STRING_LEN |
| 19 | + |
| 20 | +NQN_DATE_PATTERN = re.compile(r"^nqn.\d{4}-\d{2}\..*") |
| 21 | +DOMAIN_PATTERN = re.compile(r'^((?!-)[A-Za-z0-9-]{1,63}(?<!-)\.)+[A-Za-z]{2,}$') |
| 22 | + |
| 23 | + |
| 24 | +def _validate_nqn(nqn: str): |
| 25 | + if nqn == NVMET_DISCOVERY_NQN: |
| 26 | + return nqn |
| 27 | + elif nqn.startswith(NVMET_NQN_UUID_PREFIX): |
| 28 | + # Check for "nqn.2014-08.org.nvmexpress:uuid:11111111-2222-3333-4444-555555555555" |
| 29 | + if len(nqn) != NVMET_NQN_UUID_LEN: |
| 30 | + raise ValueError(f'{nqn}: uuid is incorrect length') |
| 31 | + UUID(nqn[NVMET_NQN_UUID_PREFIX_LEN:]) |
| 32 | + elif nqn.startswith("nqn."): |
| 33 | + # Check for "nqn.yyyy-mm.reverse.domain:user-string" |
| 34 | + if not NQN_DATE_PATTERN.match(nqn): |
| 35 | + raise ValueError(f'{nqn}: must start with "nqn.YYYY-MM.<reverse.domain>"') |
| 36 | + # Now check the domain |
| 37 | + if not nqn[12:] or '.' not in nqn[12:]: |
| 38 | + raise ValueError(f'{nqn}: must start with "nqn.YYYY-MM.<reverse.domain>" - domain missing') |
| 39 | + reverse_domain = nqn[12:].split(':', 1)[0] |
| 40 | + domain_parts = reverse_domain.split('.') |
| 41 | + domain_parts.reverse() |
| 42 | + if not DOMAIN_PATTERN.match('.'.join(domain_parts)): |
| 43 | + raise ValueError(f'{nqn}: domain does not appear to be valid') |
| 44 | + else: |
| 45 | + raise ValueError(f'{nqn}: must start with "nqn"') |
| 46 | + return nqn |
| 47 | + |
| 48 | + |
| 49 | +NQN = Annotated[str, Field(min_length=MIN_NQN_LEN, max_length=MAX_NQN_LEN), AfterValidator(_validate_nqn)] |
0 commit comments