Skip to content

Commit

Permalink
feat: add check last admin when disable and delete user (#73)
Browse files Browse the repository at this point in the history
Signed-off-by: ImMin5 <[email protected]>
  • Loading branch information
ImMin5 committed Dec 7, 2023
1 parent 6ab4cce commit 5d5a0fd
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 25 deletions.
4 changes: 4 additions & 0 deletions src/spaceone/identity/error/error_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,7 @@ class ERROR_UNABLE_TO_RESET_PASSWORD_IN_EXTERNAL_AUTH(ERROR_UNABLE_TO_RESET_PASS

class ERROR_UNABLE_TO_RESET_PASSWORD_WITHOUT_EMAIL(ERROR_UNABLE_TO_RESET_PASSWORD):
_message = "Unable to reset password without email. (user_id = {user_id})"


class ERROR_LAST_ADMIN_CANNOT_DISABLE(ERROR_INVALID_ARGUMENT):
_message = "The last admin cannot be disabled. (user_id = {user_id})"
9 changes: 7 additions & 2 deletions src/spaceone/identity/manager/api_key_manager.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import logging
from typing import Tuple
from mongoengine import QuerySet

from spaceone.core.cache import cacheable
from spaceone.core.manager import BaseManager

from spaceone.identity.lib.key_generator import KeyGenerator
from spaceone.identity.model.api_key.database import APIKey
from spaceone.identity.model.app.database import App
from spaceone.identity.model.api_key.database import APIKey
from spaceone.identity.model.domain.database import DomainSecret
from spaceone.identity.model.user.database import User

Expand Down Expand Up @@ -103,10 +105,13 @@ def _rollback(old_data):
def get_api_key(self, api_key_id: str, domain_id: str) -> APIKey:
return self.api_key_model.get(api_key_id=api_key_id, domain_id=domain_id)

def filter_api_keys(self, **conditions) -> QuerySet:
return self.api_key_model.filter(**conditions)

def list_api_keys(self, query: dict) -> Tuple[list, int]:
return self.api_key_model.query(**query)

def stat_api_keys(self, query):
def stat_api_keys(self, query: dict) -> dict:
return self.api_key_model.stat(**query)

@cacheable(key="api-key:{domain_id}", expire=60)
Expand Down
17 changes: 16 additions & 1 deletion src/spaceone/identity/service/api_key_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ def create(self, params: APIKeyCreateRequest) -> Union[APIKeyResponse, dict]:
api_key_vo, api_key = self.api_key_mgr.create_api_key_by_user_vo(
user_vo, params.dict()
)
api_key_counts = self.api_key_mgr.filter_api_keys(
user_id=user_vo.user_id, domain_id=params.domain_id
)
print(api_key_counts)
user_mgr.update_user_by_vo({"api_key_count": len(api_key_counts)}, user_vo)
return APIKeyResponse(**api_key_vo.to_dict(), api_key=api_key)

@transaction
Expand All @@ -62,7 +67,9 @@ def update(self, params: APIKeyUpdateRequest) -> Union[APIKeyResponse, dict]:
APIKeyResponse:
"""
api_key_vo = self.api_key_mgr.get_api_key(params.api_key_id, params.domain_id)
api_key_vo = self.api_key_mgr.update_api_key_by_vo(params.dict(), api_key_vo)
api_key_vo = self.api_key_mgr.update_api_key_by_vo(
params.dict(exclude_unset=True), api_key_vo
)
return APIKeyResponse(**api_key_vo.to_dict())

@transaction
Expand Down Expand Up @@ -111,6 +118,14 @@ def delete(self, params: APIKeyDeleteRequest) -> None:
None
"""
api_key_vo = self.api_key_mgr.get_api_key(params.api_key_id, params.domain_id)
if api_key_vo.user_id:
user_mgr = UserManager()
user_vo = user_mgr.get_user(api_key_vo.user_id, params.domain_id)
api_key_counts = self.api_key_mgr.filter_api_keys(
user_id=user_vo.user_id, domain_id=params.domain_id
)
user_mgr.update_user_by_vo({"api_key_count": len(api_key_counts)}, user_vo)

self.api_key_mgr.delete_api_key_by_vo(api_key_vo)

@transaction
Expand Down
68 changes: 46 additions & 22 deletions src/spaceone/identity/service/user_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
from spaceone.identity.manager.domain_secret_manager import DomainSecretManager
from spaceone.identity.manager.role_binding_manager import RoleBindingManager
from spaceone.identity.manager.mfa_manager import MFAManager
from spaceone.identity.manager.token_manager.local_token_manager import LocalTokenManager
from spaceone.identity.manager.token_manager.local_token_manager import (
LocalTokenManager,
)
from spaceone.identity.manager.user_manager import UserManager
from spaceone.identity.manager.workspace_manager import WorkspaceManager
from spaceone.identity.model.user.request import *
Expand Down Expand Up @@ -78,7 +80,9 @@ def create_user(self, params: dict) -> User:

temp_password = self._generate_temporary_password()
params["password"] = copy.deepcopy(temp_password)
reset_password_type = config.get_global("RESET_PASSWORD_TYPE", "ACCESS_TOKEN")
reset_password_type = config.get_global(
"RESET_PASSWORD_TYPE", "ACCESS_TOKEN"
)

if reset_password_type == "ACCESS_TOKEN":
token = self._issue_temporary_token(user_id, domain_id)
Expand Down Expand Up @@ -217,7 +221,9 @@ def verify_email(self, params: UserVerifyEmailRequest) -> None:

@transaction(append_meta={"authorization.scope": "DOMAIN_OR_USER"})
@convert_model
def confirm_email(self, params: UserConfirmEmailRequest) -> Union[UserResponse, dict]:
def confirm_email(
self, params: UserConfirmEmailRequest
) -> Union[UserResponse, dict]:
"""Confirm email
Args:
Expand Down Expand Up @@ -434,8 +440,8 @@ def delete(self, params: UserDeleteRequest) -> None:
None
"""
user_vo = self.user_mgr.get_user(params.user_id, params.domain_id)
self._check_last_admin(user_vo)

# TODO: check this user is last admin (DOMAIN_ADMIN, SYSTEM_ADMIN)
self.user_mgr.delete_user_by_vo(user_vo)

@transaction(append_meta={"authorization.scope": "DOMAIN"})
Expand Down Expand Up @@ -475,7 +481,7 @@ def disable(self, params: UserDisableRequest) -> Union[UserResponse, dict]:
user_vo = self.user_mgr.get_user(params.user_id, params.domain_id)
user_vo = self.user_mgr.update_user_by_vo({"state": "DISABLED"}, user_vo)

# TODO: check this user is last admin (DOMAIN_ADMIN, SYSTEM_ADMIN)
self._check_last_admin(user_vo)

return UserResponse(**user_vo.to_dict())

Expand All @@ -499,8 +505,10 @@ def get(self, params: UserGetRequest) -> Union[UserResponse, dict]:

@transaction(append_meta={"authorization.scope": "DOMAIN_READ"})
@convert_model
def get_workspaces(self, params: UserWorkspacesRequest) -> Union[WorkspacesResponse, dict]:
""" Find user
def get_workspaces(
self, params: UserWorkspacesRequest
) -> Union[WorkspacesResponse, dict]:
"""Find user
Args:
params (UserWorkspacesRequest): {
'user_id': 'str', # required
Expand All @@ -516,12 +524,14 @@ def get_workspaces(self, params: UserWorkspacesRequest) -> Union[WorkspacesRespo

user_vo = self.user_mgr.get_user(params.user_id, params.domain_id)

if user_vo.role_type == ['SYSTEM_ADMIN', 'DOMAIN_ADMIN']:
if user_vo.role_type == ["SYSTEM_ADMIN", "DOMAIN_ADMIN"]:
allow_all = True
else:
rb_vos = rb_mgr.filter_role_bindings(
user_id=params.user_id, domain_id=params.domain_id, role_type=['WORKSPACE_OWNER', 'WORKSPACE_MEMBER'],
workspace_id='*'
user_id=params.user_id,
domain_id=params.domain_id,
role_type=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"],
workspace_id="*",
)

if rb_vos.count() > 0:
Expand All @@ -531,19 +541,25 @@ def get_workspaces(self, params: UserWorkspacesRequest) -> Union[WorkspacesRespo
workspace_vos = workspace_mgr.filter_workspaces(domain_id=params.domain_id)
else:
rb_vos = rb_mgr.filter_role_bindings(
user_id=params.user_id, domain_id=params.domain_id, role_type=['WORKSPACE_OWNER', 'WORKSPACE_MEMBER']
user_id=params.user_id,
domain_id=params.domain_id,
role_type=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"],
)

workspace_ids = list(set([rb.workspace_id for rb in rb_vos]))
workspace_vos = workspace_mgr.filter_workspaces(workspace_id=workspace_ids, domain_id=params.domain_id)
workspace_vos = workspace_mgr.filter_workspaces(
workspace_id=workspace_ids, domain_id=params.domain_id
)

workspaces_info = [workspace_vo.to_dict() for workspace_vo in workspace_vos]
return WorkspacesResponse(results=workspaces_info, total_count=len(workspaces_info))
return WorkspacesResponse(
results=workspaces_info, total_count=len(workspaces_info)
)

@transaction(append_meta={"authorization.scope": "DOMAIN_READ"})
@convert_model
def find(self, params: UserFindRequest) -> Union[UsersSummaryResponse, dict]:
""" Find user
"""Find user
Args:
params (UserFindRequest): {
'keyword': 'str', # required
Expand All @@ -557,23 +573,23 @@ def find(self, params: UserFindRequest) -> Union[UsersSummaryResponse, dict]:
"""

query = {
"filter": [
{"k": "domain_id", "v": params.domain_id, "o": "eq"}
],
"filter": [{"k": "domain_id", "v": params.domain_id, "o": "eq"}],
"filter_or": [
{"k": "user_id", "v": params.keyword, "o": "contain"},
{"k": "name", "v": params.keyword, "o": "contain"}
{"k": "name", "v": params.keyword, "o": "contain"},
],
"page": params.page,
"only": ["user_id", "name", "state"]
"only": ["user_id", "name", "state"],
}

if params.state:
query["filter"].append({"k": "state", "v": params.state, "o": "eq"})

if params.exclude_workspace_id:
rb_mgr = RoleBindingManager()
rb_vos = rb_mgr.filter_role_bindings(workspace_id=params.exclude_workspace_id, domain_id=params.domain_id)
rb_vos = rb_mgr.filter_role_bindings(
workspace_id=params.exclude_workspace_id, domain_id=params.domain_id
)
user_ids = list(set([rb.user_id for rb in rb_vos]))
query["filter"].append({"k": "user_id", "v": user_ids, "o": "not_in"})

Expand All @@ -589,7 +605,6 @@ def find(self, params: UserFindRequest) -> Union[UsersSummaryResponse, dict]:
"name",
"state",
"email",
"user_type",
"auth_type",
"domain_id",
]
Expand Down Expand Up @@ -623,7 +638,7 @@ def list(self, params: UserSearchQueryRequest) -> Union[UsersResponse, dict]:
@append_keyword_filter(["user_id", "name", "email"])
@convert_model
def stat(self, params: UserStatQueryRequest) -> dict:
""" stat users
"""stat users
Args:
params (UserStatQueryRequest): {
Expand Down Expand Up @@ -671,6 +686,15 @@ def _get_console_url(self, domain_id):
console_domain = config.get_global("EMAIL_CONSOLE_DOMAIN")
return console_domain.format(domain_name=domain_name)

@staticmethod
def _check_last_admin(user_vo):
rb_mgr = RoleBindingManager()
rb_vos = rb_mgr.filter_role_bindings(
domain_id=user_vo.domain_id, role_type=user_vo.role_type
)
if rb_vos.count() == 1:
raise ERROR_LAST_ADMIN_CANNOT_DISABLE(user_id=user_vo.user_id)

@staticmethod
def _check_reset_password_eligibility(user_id, auth_type, email):
if auth_type == "EXTERNAL":
Expand Down

0 comments on commit 5d5a0fd

Please sign in to comment.