diff --git a/src/spaceone/identity/service/token_service.py b/src/spaceone/identity/service/token_service.py index 47f9673..cbef34c 100644 --- a/src/spaceone/identity/service/token_service.py +++ b/src/spaceone/identity/service/token_service.py @@ -12,6 +12,7 @@ from spaceone.identity.error.error_workspace import ERROR_WORKSPACE_STATE from spaceone.identity.manager.app_manager import AppManager from spaceone.identity.manager.domain_manager import DomainManager +from spaceone.identity.manager.external_auth_manager import ExternalAuthManager from spaceone.identity.manager import SecretManager from spaceone.identity.manager.domain_secret_manager import DomainSecretManager from spaceone.identity.manager.mfa_manager.base import MFAManager @@ -24,6 +25,7 @@ from spaceone.identity.manager.user_manager import UserManager from spaceone.identity.manager.workspace_manager import WorkspaceManager from spaceone.identity.model.app.database import App +from spaceone.identity.model.domain.database import Domain from spaceone.identity.model.token.request import * from spaceone.identity.model.token.response import * from spaceone.identity.model.user.database import User @@ -83,32 +85,29 @@ def issue(self, params: TokenIssueRequest) -> Union[TokenResponse, dict]: user_vo = token_mgr.user user_mfa = user_vo.mfa.to_dict() if user_vo.mfa else {} - mfa_type = user_mfa.get("mfa_type") + mfa_type = user_mfa.get('mfa_type') permissions = self._get_permissions_from_required_actions(user_vo) mfa_user_id = user_vo.user_id - if user_mfa.get("state", "DISABLED") == "ENABLED" and params.auth_type != "MFA": - mfa_manager = MFAManager.get_manager_by_mfa_type(mfa_type) - if mfa_type == "EMAIL": - mfa_email = user_mfa["options"].get("email") - mfa_manager.send_mfa_authentication_email( - user_vo.user_id, domain_id, mfa_email, user_vo.language, credentials - ) - mfa_user_id = mfa_email + if self._check_login_protocol_with_user_auth_type(params.auth_type, domain_id): + if user_mfa.get("state", "DISABLED") == "ENABLED" and params.auth_type != "MFA": + mfa_manager = MFAManager.get_manager_by_mfa_type(mfa_type) + if mfa_type == "EMAIL": + mfa_email = user_mfa["options"].get("email") + mfa_manager.send_mfa_authentication_email( + user_vo.user_id, domain_id, mfa_email, user_vo.language, credentials + ) + mfa_user_id = mfa_email - elif mfa_type == "OTP": - secret_manager: SecretManager = self.locator.get_manager(SecretManager) - user_secret_id = user_mfa["options"].get("user_secret_id") - otp_secret_key = secret_manager.get_user_otp_secret_key( - user_secret_id, domain_id - ) + elif mfa_type == "OTP": + secret_manager: SecretManager = self.locator.get_manager(SecretManager) + user_secret_id = user_mfa["options"].get("user_secret_id") + otp_secret_key = secret_manager.get_user_otp_secret_key(user_secret_id, domain_id) - mfa_manager.set_cache_otp_mfa_secret_key( - otp_secret_key, user_vo.user_id, domain_id, credentials - ) + mfa_manager.set_cache_otp_mfa_secret_key(otp_secret_key, user_vo.user_id, domain_id, credentials) - raise ERROR_MFA_REQUIRED(user_id=mfa_user_id, mfa_type=mfa_type) + raise ERROR_MFA_REQUIRED(user_id=mfa_user_id, mfa_type=mfa_type) token_info = token_mgr.issue_token( private_jwk, @@ -397,6 +396,17 @@ def _get_user_projects( return user_projects + def _check_login_protocol_with_user_auth_type(self, user_auth_type: str, domain_id: str) -> bool: + if user_auth_type == "EXTERNAL": + domain: Domain = self.domain_mgr.get_domain(domain_id) + external_auth_mgr = ExternalAuthManager() + external_metadata_protocol = external_auth_mgr.get_auth_info(domain).get('metadata', {}).get('protocol') + + if external_metadata_protocol == "saml": + return False + + return True + @staticmethod def _check_user_required_actions(required_actions: list, user_id: str) -> None: if required_actions: