Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 68 additions & 21 deletions ninja/signature/details.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,28 @@ def _create_models(self) -> TModels:
return result

def _args_flatten_map(self, args: List[FuncParam]) -> Dict[str, Tuple[str, ...]]:
flatten_map = {}
flatten_map: Dict[str, Tuple[str, ...]] = {}
arg_names: Any = {}
for arg in args:
# Check if this is an optional union type with None default
if get_origin(arg.annotation) in UNION_TYPES:
union_args = get_args(arg.annotation)
has_none = type(None) in union_args
# If it's a union with None and the source default is None (like Query(None)), don't flatten it
if (
has_none
and hasattr(arg.source, "default")
and arg.source.default is None
):
name = arg.alias
if name in flatten_map:
raise ConfigError(
f"Duplicated name: '{name}' also in '{arg_names[name]}'"
)
flatten_map[name] = (name,)
arg_names[name] = name
continue

if is_pydantic_model(arg.annotation):
for name, path in self._model_flatten_map(arg.annotation, arg.alias):
if name in flatten_map:
Expand All @@ -207,12 +226,19 @@ def _args_flatten_map(self, args: List[FuncParam]) -> Dict[str, Tuple[str, ...]]

def _model_flatten_map(self, model: TModel, prefix: str) -> Generator:
field: FieldInfo
for attr, field in model.model_fields.items():
field_name = field.alias or attr
name = f"{prefix}{self.FLATTEN_PATH_SEP}{field_name}"
if is_pydantic_model(field.annotation):
yield from self._model_flatten_map(field.annotation, name) # type: ignore
else:
if get_origin(model) in UNION_TYPES:
# If the model is a union type, process each type in the union
for arg in get_args(model):
yield from self._model_flatten_map(arg, prefix)
else:
for attr, field in model.model_fields.items():
field_name = field.alias or attr
name = f"{prefix}{self.FLATTEN_PATH_SEP}{field_name}"

if get_origin(
field.annotation
) not in UNION_TYPES and is_pydantic_model(field.annotation):
yield from self._model_flatten_map(field.annotation, name) # type: ignore
yield field_name, name

def _get_param_type(self, name: str, arg: inspect.Parameter) -> FuncParam:
Expand Down Expand Up @@ -260,15 +286,21 @@ def _get_param_type(self, name: str, arg: inspect.Parameter) -> FuncParam:

# 2) if param name is a part of the path parameter
elif name in self.path_params_names:
assert (
default == self.signature.empty
), f"'{name}' is a path param, default not allowed"
assert default == self.signature.empty, (
f"'{name}' is a path param, default not allowed"
)
param_source = Path(...)

# 3) if param is a collection, or annotation is part of pydantic model:
elif is_collection or is_pydantic_model(annotation):
if default == self.signature.empty:
param_source = Body(...)
# Check if this is a Union type that includes None - if so, None should be a valid value
if get_origin(annotation) in UNION_TYPES and type(None) in get_args(
annotation
):
param_source = Body(None) # Make it optional with None default
else:
param_source = Body(...)
else:
param_source = Body(default)

Expand All @@ -295,7 +327,11 @@ def is_pydantic_model(cls: Any) -> bool:

# Handle Union types
if origin in UNION_TYPES:
return any(issubclass(arg, pydantic.BaseModel) for arg in get_args(cls))
return any(
issubclass(arg, pydantic.BaseModel)
for arg in get_args(cls)
if arg is not type(None)
)
return issubclass(cls, pydantic.BaseModel)
except TypeError: # pragma: no cover
return False
Expand Down Expand Up @@ -338,20 +374,31 @@ def detect_collection_fields(
for attr in path[1:]:
if hasattr(annotation_or_field, "annotation"):
annotation_or_field = annotation_or_field.annotation
annotation_or_field = next(
(
a
for a in annotation_or_field.model_fields.values()
if a.alias == attr
),
annotation_or_field.model_fields.get(attr),
) # pragma: no cover

# check union types
if get_origin(annotation_or_field) in UNION_TYPES:
for arg in get_args(annotation_or_field): # pragma: no branch
found_field = next(
(a for a in arg.model_fields.values() if a.alias == attr),
arg.model_fields.get(attr),
)
if found_field is not None:
annotation_or_field = found_field
break
else:
annotation_or_field = next(
(
a
for a in annotation_or_field.model_fields.values()
if a.alias == attr
),
annotation_or_field.model_fields.get(attr),
)

annotation_or_field = getattr(
annotation_or_field, "outer_type_", annotation_or_field
)

# if hasattr(annotation_or_field, "annotation"):
annotation_or_field = annotation_or_field.annotation

if is_collection_type(annotation_or_field):
Expand Down
2 changes: 1 addition & 1 deletion ninja/testing/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def __init__(self, http_response: Union[HttpResponse, StreamingHttpResponse]):
if self.streaming:
self.content = b"".join(http_response.streaming_content) # type: ignore
else:
self.content = http_response.content # type: ignore[union-attr]
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mypy was complaining about this ignore being unnecessary for me

self.content = http_response.content
self._data = None

def json(self) -> Any:
Expand Down
4 changes: 4 additions & 0 deletions tests/test_misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ def test_is_pydantic_model():
class Model(BaseModel):
x: int

class ModelNone(BaseModel):
x: int | None

assert is_pydantic_model(Model)
assert is_pydantic_model("instance") is False
assert is_pydantic_model(ModelNone)


def test_client():
Expand Down
Loading