Skip to content

Commit

Permalink
Add seen_map
Browse files Browse the repository at this point in the history
  • Loading branch information
Stranger6667 committed Sep 10, 2020
1 parent 448e36a commit c8f78f9
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 27 deletions.
60 changes: 34 additions & 26 deletions src/hypothesis_jsonschema/_canonicalise.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
import math
import re
from copy import deepcopy
from typing import Any, Dict, List, NoReturn, Optional, Tuple, Union
from typing import Any, Dict, List, NoReturn, Optional, Set, Tuple, Union
from urllib.parse import urljoin

import jsonschema
from hypothesis.errors import InvalidArgument
Expand Down Expand Up @@ -85,9 +86,9 @@ def _get_validator_class(schema: Schema) -> JSONSchemaValidator:
return validator


def make_validator(schema: Schema) -> JSONSchemaValidator:
def make_validator(schema: Schema, resolver: LocalResolver) -> JSONSchemaValidator:
validator = _get_validator_class(schema)
return validator(schema)
return validator(schema, resolver=resolver)


class HypothesisRefResolutionError(jsonschema.exceptions.RefResolutionError):
Expand Down Expand Up @@ -567,15 +568,14 @@ def canonicalish(


def resolve_all_refs(
schema: Union[bool, Schema], *, resolver: LocalResolver = None
schema: Union[bool, Schema],
*,
resolver: LocalResolver = None,
seen_map: Dict[str, Set[str]] = None,
) -> Schema:
"""
Resolve all references in the given schema.
This handles nested definitions, but not recursive definitions.
The latter require special handling to convert to strategies and are much
less common, so we just ignore them (and error out) for now.
"""
"""Resolve all non-recursive references in the given schema."""
if seen_map is None:
seen_map = {}
if isinstance(schema, bool):
return canonicalish(schema)
assert isinstance(schema, dict), schema
Expand All @@ -587,41 +587,49 @@ def resolve_all_refs(
)

def is_recursive(reference: str) -> bool:
return reference == "#" or reference in resolver._scopes_stack # type: ignore
full_ref = urljoin(resolver.base_uri, reference) # type: ignore
return reference == "#" or reference in resolver._scopes_stack or full_ref in resolver._scopes_stack # type: ignore

# To avoid infinite recursion, we skip all recursive definitions, and such references will be processed later
# A definition is recursive if it contains a reference to itself or one of its ancestors.
if "$ref" in schema and not is_recursive(schema["$ref"]): # type: ignore
s = dict(schema)
ref = s.pop("$ref")
with resolver.resolving(ref) as got:
if s == {}:
return resolve_all_refs(got, resolver=resolver)
m = merged([s, got], resolver=resolver)
if m is None: # pragma: no cover
msg = f"$ref:{ref!r} had incompatible base schema {s!r}"
raise HypothesisRefResolutionError(msg)
return resolve_all_refs(m, resolver=resolver)
if "$ref" in schema:
path = "-".join(resolver._scopes_stack)
seen_paths = seen_map.setdefault(path, set())
if schema["$ref"] not in seen_paths and not is_recursive(schema["$ref"]): # type: ignore
seen_paths.add(schema["$ref"]) # type: ignore
s = dict(schema)
ref = s.pop("$ref")
with resolver.resolving(ref) as got:
if s == {}:
return resolve_all_refs(got, resolver=resolver, seen_map=seen_map)
m = merged([s, got])
if m is None: # pragma: no cover
msg = f"$ref:{ref!r} had incompatible base schema {s!r}"
raise HypothesisRefResolutionError(msg)

return resolve_all_refs(m, resolver=resolver, seen_map=seen_map)

for key in SCHEMA_KEYS:
val = schema.get(key, False)
if isinstance(val, list):
schema[key] = [
resolve_all_refs(deepcopy(v), resolver=resolver)
resolve_all_refs(deepcopy(v), resolver=resolver, seen_map=seen_map)
if isinstance(v, dict)
else v
for v in val
]
elif isinstance(val, dict):
schema[key] = resolve_all_refs(deepcopy(val), resolver=resolver)
schema[key] = resolve_all_refs(
deepcopy(val), resolver=resolver, seen_map=seen_map
)
else:
assert isinstance(val, bool)
for key in SCHEMA_OBJECT_KEYS: # values are keys-to-schema-dicts, not schemas
if key in schema:
subschema = schema[key]
assert isinstance(subschema, dict)
schema[key] = {
k: resolve_all_refs(deepcopy(v), resolver=resolver)
k: resolve_all_refs(deepcopy(v), resolver=resolver, seen_map=seen_map)
if isinstance(v, dict)
else v
for k, v in subschema.items()
Expand Down
1 change: 0 additions & 1 deletion src/hypothesis_jsonschema/_from_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
FALSEY,
TRUTHY,
TYPE_STRINGS,
JSONType,
LocalResolver,
Schema,
canonicalish,
Expand Down

0 comments on commit c8f78f9

Please sign in to comment.