Skip to content

Commit

Permalink
Add seen_map
Browse files Browse the repository at this point in the history
  • Loading branch information
Stranger6667 committed Sep 10, 2020
1 parent 448e36a commit 9aace2c
Showing 1 changed file with 32 additions and 24 deletions.
56 changes: 32 additions & 24 deletions src/hypothesis_jsonschema/_canonicalise.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
import math
import re
from copy import deepcopy
from typing import Any, Dict, List, NoReturn, Optional, Tuple, Union
from typing import Any, Dict, List, NoReturn, Optional, Set, Tuple, Union
from urllib.parse import urljoin

import jsonschema
from hypothesis.errors import InvalidArgument
Expand Down Expand Up @@ -567,15 +568,14 @@ def canonicalish(


def resolve_all_refs(
schema: Union[bool, Schema], *, resolver: LocalResolver = None
schema: Union[bool, Schema],
*,
resolver: LocalResolver = None,
seen_map: Dict[str, Set[str]] = None,
) -> Schema:
"""
Resolve all references in the given schema.
This handles nested definitions, but not recursive definitions.
The latter require special handling to convert to strategies and are much
less common, so we just ignore them (and error out) for now.
"""
"""Resolve all non-recursive references in the given schema."""
if seen_map is None:
seen_map = {}
if isinstance(schema, bool):
return canonicalish(schema)
assert isinstance(schema, dict), schema
Expand All @@ -587,41 +587,49 @@ def resolve_all_refs(
)

def is_recursive(reference: str) -> bool:
return reference == "#" or reference in resolver._scopes_stack # type: ignore
full_ref = urljoin(resolver.base_uri, reference) # type: ignore
return reference == "#" or reference in resolver._scopes_stack or full_ref in resolver._scopes_stack # type: ignore

# To avoid infinite recursion, we skip all recursive definitions, and such references will be processed later
# A definition is recursive if it contains a reference to itself or one of its ancestors.
if "$ref" in schema and not is_recursive(schema["$ref"]): # type: ignore
s = dict(schema)
ref = s.pop("$ref")
with resolver.resolving(ref) as got:
if s == {}:
return resolve_all_refs(got, resolver=resolver)
m = merged([s, got], resolver=resolver)
if m is None: # pragma: no cover
msg = f"$ref:{ref!r} had incompatible base schema {s!r}"
raise HypothesisRefResolutionError(msg)
return resolve_all_refs(m, resolver=resolver)
if "$ref" in schema:
path = "-".join(resolver._scopes_stack)
seen_paths = seen_map.setdefault(path, set())
if schema["$ref"] not in seen_paths and not is_recursive(schema["$ref"]): # type: ignore
seen_paths.add(schema["$ref"]) # type: ignore
s = dict(schema)
ref = s.pop("$ref")
with resolver.resolving(ref) as got:
if s == {}:
return resolve_all_refs(got, resolver=resolver, seen_map=seen_map)
m = merged([s, got])
if m is None: # pragma: no cover
msg = f"$ref:{ref!r} had incompatible base schema {s!r}"
raise HypothesisRefResolutionError(msg)

return resolve_all_refs(m, resolver=resolver, seen_map=seen_map)

for key in SCHEMA_KEYS:
val = schema.get(key, False)
if isinstance(val, list):
schema[key] = [
resolve_all_refs(deepcopy(v), resolver=resolver)
resolve_all_refs(deepcopy(v), resolver=resolver, seen_map=seen_map)
if isinstance(v, dict)
else v
for v in val
]
elif isinstance(val, dict):
schema[key] = resolve_all_refs(deepcopy(val), resolver=resolver)
schema[key] = resolve_all_refs(
deepcopy(val), resolver=resolver, seen_map=seen_map
)
else:
assert isinstance(val, bool)
for key in SCHEMA_OBJECT_KEYS: # values are keys-to-schema-dicts, not schemas
if key in schema:
subschema = schema[key]
assert isinstance(subschema, dict)
schema[key] = {
k: resolve_all_refs(deepcopy(v), resolver=resolver)
k: resolve_all_refs(deepcopy(v), resolver=resolver, seen_map=seen_map)
if isinstance(v, dict)
else v
for k, v in subschema.items()
Expand Down

0 comments on commit 9aace2c

Please sign in to comment.