diff --git a/src/spaceone/identity/error/error_user.py b/src/spaceone/identity/error/error_user.py index 4f58f3d7..218629ff 100644 --- a/src/spaceone/identity/error/error_user.py +++ b/src/spaceone/identity/error/error_user.py @@ -57,3 +57,7 @@ class ERROR_UNABLE_TO_RESET_PASSWORD_IN_EXTERNAL_AUTH(ERROR_UNABLE_TO_RESET_PASS class ERROR_UNABLE_TO_RESET_PASSWORD_WITHOUT_EMAIL(ERROR_UNABLE_TO_RESET_PASSWORD): _message = "Unable to reset password without email. (user_id = {user_id})" + + +class ERROR_LAST_ADMIN_CANNOT_DISABLE(ERROR_INVALID_ARGUMENT): + _message = "The last admin cannot be disabled. (user_id = {user_id})" diff --git a/src/spaceone/identity/manager/api_key_manager.py b/src/spaceone/identity/manager/api_key_manager.py index 2d173bd7..42d0f338 100644 --- a/src/spaceone/identity/manager/api_key_manager.py +++ b/src/spaceone/identity/manager/api_key_manager.py @@ -1,11 +1,13 @@ import logging from typing import Tuple +from mongoengine import QuerySet from spaceone.core.cache import cacheable from spaceone.core.manager import BaseManager + from spaceone.identity.lib.key_generator import KeyGenerator -from spaceone.identity.model.api_key.database import APIKey from spaceone.identity.model.app.database import App +from spaceone.identity.model.api_key.database import APIKey from spaceone.identity.model.domain.database import DomainSecret from spaceone.identity.model.user.database import User @@ -103,10 +105,13 @@ def _rollback(old_data): def get_api_key(self, api_key_id: str, domain_id: str) -> APIKey: return self.api_key_model.get(api_key_id=api_key_id, domain_id=domain_id) + def filter_api_keys(self, **conditions) -> QuerySet: + return self.api_key_model.filter(**conditions) + def list_api_keys(self, query: dict) -> Tuple[list, int]: return self.api_key_model.query(**query) - def stat_api_keys(self, query): + def stat_api_keys(self, query: dict) -> dict: return self.api_key_model.stat(**query) @cacheable(key="api-key:{domain_id}", expire=60) diff --git a/src/spaceone/identity/service/api_key_service.py b/src/spaceone/identity/service/api_key_service.py index 0cb91623..ee01334c 100644 --- a/src/spaceone/identity/service/api_key_service.py +++ b/src/spaceone/identity/service/api_key_service.py @@ -46,6 +46,11 @@ def create(self, params: APIKeyCreateRequest) -> Union[APIKeyResponse, dict]: api_key_vo, api_key = self.api_key_mgr.create_api_key_by_user_vo( user_vo, params.dict() ) + api_key_counts = self.api_key_mgr.filter_api_keys( + user_id=user_vo.user_id, domain_id=params.domain_id + ) + print(api_key_counts) + user_mgr.update_user_by_vo({"api_key_count": len(api_key_counts)}, user_vo) return APIKeyResponse(**api_key_vo.to_dict(), api_key=api_key) @transaction @@ -62,7 +67,9 @@ def update(self, params: APIKeyUpdateRequest) -> Union[APIKeyResponse, dict]: APIKeyResponse: """ api_key_vo = self.api_key_mgr.get_api_key(params.api_key_id, params.domain_id) - api_key_vo = self.api_key_mgr.update_api_key_by_vo(params.dict(), api_key_vo) + api_key_vo = self.api_key_mgr.update_api_key_by_vo( + params.dict(exclude_unset=True), api_key_vo + ) return APIKeyResponse(**api_key_vo.to_dict()) @transaction @@ -111,6 +118,14 @@ def delete(self, params: APIKeyDeleteRequest) -> None: None """ api_key_vo = self.api_key_mgr.get_api_key(params.api_key_id, params.domain_id) + if api_key_vo.user_id: + user_mgr = UserManager() + user_vo = user_mgr.get_user(api_key_vo.user_id, params.domain_id) + api_key_counts = self.api_key_mgr.filter_api_keys( + user_id=user_vo.user_id, domain_id=params.domain_id + ) + user_mgr.update_user_by_vo({"api_key_count": len(api_key_counts)}, user_vo) + self.api_key_mgr.delete_api_key_by_vo(api_key_vo) @transaction diff --git a/src/spaceone/identity/service/user_service.py b/src/spaceone/identity/service/user_service.py index db4293c3..0251deed 100644 --- a/src/spaceone/identity/service/user_service.py +++ b/src/spaceone/identity/service/user_service.py @@ -21,7 +21,9 @@ from spaceone.identity.manager.domain_secret_manager import DomainSecretManager from spaceone.identity.manager.role_binding_manager import RoleBindingManager from spaceone.identity.manager.mfa_manager import MFAManager -from spaceone.identity.manager.token_manager.local_token_manager import LocalTokenManager +from spaceone.identity.manager.token_manager.local_token_manager import ( + LocalTokenManager, +) from spaceone.identity.manager.user_manager import UserManager from spaceone.identity.manager.workspace_manager import WorkspaceManager from spaceone.identity.model.user.request import * @@ -78,7 +80,9 @@ def create_user(self, params: dict) -> User: temp_password = self._generate_temporary_password() params["password"] = copy.deepcopy(temp_password) - reset_password_type = config.get_global("RESET_PASSWORD_TYPE", "ACCESS_TOKEN") + reset_password_type = config.get_global( + "RESET_PASSWORD_TYPE", "ACCESS_TOKEN" + ) if reset_password_type == "ACCESS_TOKEN": token = self._issue_temporary_token(user_id, domain_id) @@ -217,7 +221,9 @@ def verify_email(self, params: UserVerifyEmailRequest) -> None: @transaction(append_meta={"authorization.scope": "DOMAIN_OR_USER"}) @convert_model - def confirm_email(self, params: UserConfirmEmailRequest) -> Union[UserResponse, dict]: + def confirm_email( + self, params: UserConfirmEmailRequest + ) -> Union[UserResponse, dict]: """Confirm email Args: @@ -434,8 +440,8 @@ def delete(self, params: UserDeleteRequest) -> None: None """ user_vo = self.user_mgr.get_user(params.user_id, params.domain_id) + self._check_last_admin(user_vo) - # TODO: check this user is last admin (DOMAIN_ADMIN, SYSTEM_ADMIN) self.user_mgr.delete_user_by_vo(user_vo) @transaction(append_meta={"authorization.scope": "DOMAIN"}) @@ -475,7 +481,7 @@ def disable(self, params: UserDisableRequest) -> Union[UserResponse, dict]: user_vo = self.user_mgr.get_user(params.user_id, params.domain_id) user_vo = self.user_mgr.update_user_by_vo({"state": "DISABLED"}, user_vo) - # TODO: check this user is last admin (DOMAIN_ADMIN, SYSTEM_ADMIN) + self._check_last_admin(user_vo) return UserResponse(**user_vo.to_dict()) @@ -499,8 +505,10 @@ def get(self, params: UserGetRequest) -> Union[UserResponse, dict]: @transaction(append_meta={"authorization.scope": "DOMAIN_READ"}) @convert_model - def get_workspaces(self, params: UserWorkspacesRequest) -> Union[WorkspacesResponse, dict]: - """ Find user + def get_workspaces( + self, params: UserWorkspacesRequest + ) -> Union[WorkspacesResponse, dict]: + """Find user Args: params (UserWorkspacesRequest): { 'user_id': 'str', # required @@ -516,12 +524,14 @@ def get_workspaces(self, params: UserWorkspacesRequest) -> Union[WorkspacesRespo user_vo = self.user_mgr.get_user(params.user_id, params.domain_id) - if user_vo.role_type == ['SYSTEM_ADMIN', 'DOMAIN_ADMIN']: + if user_vo.role_type == ["SYSTEM_ADMIN", "DOMAIN_ADMIN"]: allow_all = True else: rb_vos = rb_mgr.filter_role_bindings( - user_id=params.user_id, domain_id=params.domain_id, role_type=['WORKSPACE_OWNER', 'WORKSPACE_MEMBER'], - workspace_id='*' + user_id=params.user_id, + domain_id=params.domain_id, + role_type=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + workspace_id="*", ) if rb_vos.count() > 0: @@ -531,19 +541,25 @@ def get_workspaces(self, params: UserWorkspacesRequest) -> Union[WorkspacesRespo workspace_vos = workspace_mgr.filter_workspaces(domain_id=params.domain_id) else: rb_vos = rb_mgr.filter_role_bindings( - user_id=params.user_id, domain_id=params.domain_id, role_type=['WORKSPACE_OWNER', 'WORKSPACE_MEMBER'] + user_id=params.user_id, + domain_id=params.domain_id, + role_type=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"], ) workspace_ids = list(set([rb.workspace_id for rb in rb_vos])) - workspace_vos = workspace_mgr.filter_workspaces(workspace_id=workspace_ids, domain_id=params.domain_id) + workspace_vos = workspace_mgr.filter_workspaces( + workspace_id=workspace_ids, domain_id=params.domain_id + ) workspaces_info = [workspace_vo.to_dict() for workspace_vo in workspace_vos] - return WorkspacesResponse(results=workspaces_info, total_count=len(workspaces_info)) + return WorkspacesResponse( + results=workspaces_info, total_count=len(workspaces_info) + ) @transaction(append_meta={"authorization.scope": "DOMAIN_READ"}) @convert_model def find(self, params: UserFindRequest) -> Union[UsersSummaryResponse, dict]: - """ Find user + """Find user Args: params (UserFindRequest): { 'keyword': 'str', # required @@ -557,15 +573,13 @@ def find(self, params: UserFindRequest) -> Union[UsersSummaryResponse, dict]: """ query = { - "filter": [ - {"k": "domain_id", "v": params.domain_id, "o": "eq"} - ], + "filter": [{"k": "domain_id", "v": params.domain_id, "o": "eq"}], "filter_or": [ {"k": "user_id", "v": params.keyword, "o": "contain"}, - {"k": "name", "v": params.keyword, "o": "contain"} + {"k": "name", "v": params.keyword, "o": "contain"}, ], "page": params.page, - "only": ["user_id", "name", "state"] + "only": ["user_id", "name", "state"], } if params.state: @@ -573,7 +587,9 @@ def find(self, params: UserFindRequest) -> Union[UsersSummaryResponse, dict]: if params.exclude_workspace_id: rb_mgr = RoleBindingManager() - rb_vos = rb_mgr.filter_role_bindings(workspace_id=params.exclude_workspace_id, domain_id=params.domain_id) + rb_vos = rb_mgr.filter_role_bindings( + workspace_id=params.exclude_workspace_id, domain_id=params.domain_id + ) user_ids = list(set([rb.user_id for rb in rb_vos])) query["filter"].append({"k": "user_id", "v": user_ids, "o": "not_in"}) @@ -589,7 +605,6 @@ def find(self, params: UserFindRequest) -> Union[UsersSummaryResponse, dict]: "name", "state", "email", - "user_type", "auth_type", "domain_id", ] @@ -623,7 +638,7 @@ def list(self, params: UserSearchQueryRequest) -> Union[UsersResponse, dict]: @append_keyword_filter(["user_id", "name", "email"]) @convert_model def stat(self, params: UserStatQueryRequest) -> dict: - """ stat users + """stat users Args: params (UserStatQueryRequest): { @@ -671,6 +686,15 @@ def _get_console_url(self, domain_id): console_domain = config.get_global("EMAIL_CONSOLE_DOMAIN") return console_domain.format(domain_name=domain_name) + @staticmethod + def _check_last_admin(user_vo): + rb_mgr = RoleBindingManager() + rb_vos = rb_mgr.filter_role_bindings( + domain_id=user_vo.domain_id, role_type=user_vo.role_type + ) + if rb_vos.count() == 1: + raise ERROR_LAST_ADMIN_CANNOT_DISABLE(user_id=user_vo.user_id) + @staticmethod def _check_reset_password_eligibility(user_id, auth_type, email): if auth_type == "EXTERNAL":