diff --git a/src/spaceone/identity/service/user_profile_service.py b/src/spaceone/identity/service/user_profile_service.py index 561ae832..7b81556a 100644 --- a/src/spaceone/identity/service/user_profile_service.py +++ b/src/spaceone/identity/service/user_profile_service.py @@ -4,6 +4,7 @@ import string from typing import Dict, List, Union +from mongoengine import QuerySet from spaceone.core import config from spaceone.core.service import * from spaceone.core.service.utils import * @@ -22,6 +23,7 @@ from spaceone.identity.manager.user_manager import UserManager from spaceone.identity.manager.workspace_group_manager import WorkspaceGroupManager from spaceone.identity.manager.workspace_manager import WorkspaceManager +from spaceone.identity.model import WorkspaceGroup from spaceone.identity.model.user.database import User from spaceone.identity.model.user.response import * from spaceone.identity.model.user_profile.request import * @@ -416,75 +418,28 @@ def get_workspace_groups( ) -> Union[MyWorkspaceGroupsResponse, dict]: """Find user Args: - params (UserWorkspacesRequest): { - 'user_id': 'str', # injected from auth (required) - 'domain_id': 'str' # injected from auth (required) + params (UserProfileGetWorkspaceGroupsRequest): { + 'user_id': 'str', # injected from auth (required) + 'domain_id': 'str' # injected from auth (required) } Returns: - MyWorkspaceResponse: + MyWorkspaceGroupsResponse: """ - rb_mgr = RoleBindingManager() - allow_all = False - user_vo = self.user_mgr.get_user(params.user_id, params.domain_id) + allow_all = user_vo.role_type == "DOMAIN_ADMIN" - if user_vo.role_type == "DOMAIN_ADMIN": - allow_all = True - - if allow_all: - workspace_group_vos = self.workspace_group_mgr.filter_workspace_groups( - domain_id=params.domain_id - ) - workspace_group_infos = [ - workspace_group_vo.to_dict() - for workspace_group_vo in workspace_group_vos - ] - else: - query_filter = { - "filter": [ - {"key": "users.user_id", "value": params.user_id, "operator": "eq"}, - {"key": "domain_id", "value": params.domain_id, "operator": "eq"}, - ] - } - workspace_group_infos, _ = self.workspace_group_mgr.list_workspace_groups( - query_filter - ) - + workspace_group_infos = self._get_workspace_group_infos(params, allow_all) workspace_group_ids = [ - workspace_group_info["workspace_group_id"] - for workspace_group_info in workspace_group_infos + info["workspace_group_id"] for info in workspace_group_infos ] - - rb_vos = rb_mgr.filter_role_bindings( - user_id=params.user_id, - domain_id=params.domain_id, - workspace_group_id=workspace_group_ids, - role_type=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + role_bindings_info_map = self._get_role_bindings_info( + params, workspace_group_ids ) - role_bindings_info_map = {rb.workspace_group_id: rb.to_dict() for rb in rb_vos} - - workspace_group_user_ids = [] - for workspace_group_info in workspace_group_infos: - if not isinstance(workspace_group_info, dict): - workspace_group_info = workspace_group_info.to_dict() - if users := workspace_group_info.get("users", []) or []: - for user in users: - if isinstance(user, dict): - workspace_group_user_ids.append(user.get("user_id")) - elif hasattr(user, "user_id"): - workspace_group_user_ids.append(user.user_id) - - workspace_groups_info = [] - for workspace_group_info in workspace_group_infos: - workspace_group_dict = ( - self.workspace_group_svc.add_user_name_and_state_to_users( - workspace_group_user_ids, - workspace_group_info, - params.domain_id, - ) - ) - workspace_groups_info.append(workspace_group_dict) + workspace_group_user_ids = self._extract_user_ids(workspace_group_infos) + workspace_groups_info = self._add_user_name_and_state( + workspace_group_infos, params.domain_id, workspace_group_user_ids + ) my_workspace_groups_info = self._get_my_workspace_groups_info( workspace_groups_info, role_bindings_info_map ) @@ -573,9 +528,72 @@ def _get_my_workspaces_info( my_workspaces_info.append(workspace_info) return my_workspaces_info + def _get_workspace_group_infos( + self, params: UserProfileGetWorkspaceGroupsRequest, allow_all: bool + ) -> Union[QuerySet, List[Dict[str, str]]]: + if allow_all: + workspace_group_vos = self.workspace_group_mgr.filter_workspace_groups( + domain_id=params.domain_id + ) + return [vo.to_dict() for vo in workspace_group_vos] + else: + query_filter = { + "filter": [ + {"key": "users.user_id", "value": params.user_id, "operator": "eq"}, + {"key": "domain_id", "value": params.domain_id, "operator": "eq"}, + ] + } + return self.workspace_group_mgr.list_workspace_groups(query_filter)[0] + + @staticmethod + def _get_role_bindings_info( + params: UserProfileGetWorkspaceGroupsRequest, workspace_group_ids: List[str] + ) -> Dict[str, Dict[str, str]]: + rb_mgr = RoleBindingManager() + rb_vos = rb_mgr.filter_role_bindings( + user_id=params.user_id, + domain_id=params.domain_id, + workspace_group_id=workspace_group_ids, + role_type=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + return {rb.workspace_group_id: rb.to_dict() for rb in rb_vos} + + @staticmethod + def _extract_user_ids( + workspace_group_infos: Union[QuerySet, List[Dict[str, str]]] + ) -> List[str]: + workspace_group_user_ids = [] + for workspace_group_info in workspace_group_infos: + if not isinstance(workspace_group_info, dict): + workspace_group_info = workspace_group_info.to_dict() + if users := workspace_group_info.get("users", []) or []: + for user in users: + if isinstance(user, dict): + workspace_group_user_ids.append(user.get("user_id")) + elif hasattr(user, "user_id"): + workspace_group_user_ids.append(user.user_id) + + return workspace_group_user_ids + + def _add_user_name_and_state( + self, + workspace_group_infos: Union[QuerySet, List[Dict[str, str]]], + domain_id: str, + workspace_group_user_ids: List[str], + ) -> List[Union[WorkspaceGroup, Dict[str, str]]]: + updated_workspace_group_infos = [] + for workspace_group_info in workspace_group_infos: + updated_workspace_group_infos.append( + self.workspace_group_svc.add_user_name_and_state_to_users( + workspace_group_info, domain_id, workspace_group_user_ids + ) + ) + return updated_workspace_group_infos + @staticmethod def _get_my_workspace_groups_info( - workspace_groups_info: list, role_bindings_info_map: dict = None + workspace_groups_info: List[Union[WorkspaceGroup, Dict[str, str]]], + role_bindings_info_map: Dict[str, Dict[str, str]] = None, ) -> List[Dict[str, str]]: my_workspace_groups_info = [] diff --git a/src/spaceone/identity/service/workspace_group_service.py b/src/spaceone/identity/service/workspace_group_service.py index 5efd65d3..3f35a9ea 100644 --- a/src/spaceone/identity/service/workspace_group_service.py +++ b/src/spaceone/identity/service/workspace_group_service.py @@ -1,8 +1,8 @@ import logging from datetime import datetime -from mongoengine import QuerySet -from typing import Dict, List, Union, Any +from typing import Any, Dict, List, Union +from mongoengine import QuerySet from spaceone.core.error import ERROR_INVALID_PARAMETER, ERROR_NOT_FOUND from spaceone.core.service import ( BaseService, @@ -244,7 +244,7 @@ def get( ) ) workspace_group_info = self.add_user_name_and_state_to_users( - workspace_group_user_ids, workspace_group_vo, params.domain_id + workspace_group_vo, params.domain_id, workspace_group_user_ids ) return WorkspaceGroupResponse(**workspace_group_info) @@ -360,7 +360,7 @@ def process_add_users( workspace_group_user_ids: List[str] = old_user_ids + new_user_ids workspace_group_user_info = self.add_user_name_and_state_to_users( - workspace_group_user_ids, workspace_group_vo, domain_id + workspace_group_vo, domain_id, workspace_group_user_ids ) return WorkspaceGroupResponse(**workspace_group_user_info) @@ -571,17 +571,17 @@ def add_user(user_info, workspace_group_workspace_id=None): def add_user_name_and_state_to_users( self, - workspace_group_user_ids: List[str], workspace_group_info: Union[WorkspaceGroup, Dict[str, Any]], domain_id: str, + workspace_group_user_ids: List[str], ) -> Dict[str, Any]: """Add user's name and state to users in workspace group. Since the user's name and state are not in user of workspace group in database, we need to add user's name and state to users in the Application layer. Args: - workspace_group_user_ids: 'List[str]' workspace_group_info: 'Union[WorkspaceGroup, Dict[str, Any]]' domain_id: 'str' + workspace_group_user_ids: 'List[str]' Returns: workspace_group_info: 'Dict[str, Any]' """ @@ -711,7 +711,7 @@ def get_workspace_groups_info( workspace_group_vo ) workspace_group_dict = self.add_user_name_and_state_to_users( - workspace_group_user_ids, workspace_group_vo, domain_id + workspace_group_vo, domain_id, workspace_group_user_ids ) workspace_groups_info.append(workspace_group_dict) return workspace_groups_info diff --git a/src/spaceone/identity/service/workspace_group_user_service.py b/src/spaceone/identity/service/workspace_group_user_service.py index 29a2844e..c1611b24 100644 --- a/src/spaceone/identity/service/workspace_group_user_service.py +++ b/src/spaceone/identity/service/workspace_group_user_service.py @@ -1,5 +1,5 @@ import logging -from typing import List, Union +from typing import Union from spaceone.core.service import ( BaseService, @@ -202,7 +202,7 @@ def get( workspace_group_dict = ( self.workspace_group_svc.add_user_name_and_state_to_users( - workspace_group_user_ids, workspace_group_vo, params.domain_id + workspace_group_vo, params.domain_id, workspace_group_user_ids ) ) return WorkspaceGroupResponse(**workspace_group_dict)