Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update MFA requirement to prevent enforcement for EXTERNAL auth_type with SAML protocol #406

Merged
merged 2 commits into from
Nov 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 29 additions & 19 deletions src/spaceone/identity/service/token_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from spaceone.identity.error.error_workspace import ERROR_WORKSPACE_STATE
from spaceone.identity.manager.app_manager import AppManager
from spaceone.identity.manager.domain_manager import DomainManager
from spaceone.identity.manager.external_auth_manager import ExternalAuthManager
from spaceone.identity.manager import SecretManager
from spaceone.identity.manager.domain_secret_manager import DomainSecretManager
from spaceone.identity.manager.mfa_manager.base import MFAManager
Expand All @@ -24,6 +25,7 @@
from spaceone.identity.manager.user_manager import UserManager
from spaceone.identity.manager.workspace_manager import WorkspaceManager
from spaceone.identity.model.app.database import App
from spaceone.identity.model.domain.database import Domain
from spaceone.identity.model.token.request import *
from spaceone.identity.model.token.response import *
from spaceone.identity.model.user.database import User
Expand Down Expand Up @@ -83,32 +85,29 @@ def issue(self, params: TokenIssueRequest) -> Union[TokenResponse, dict]:

user_vo = token_mgr.user
user_mfa = user_vo.mfa.to_dict() if user_vo.mfa else {}
mfa_type = user_mfa.get("mfa_type")
mfa_type = user_mfa.get('mfa_type')
permissions = self._get_permissions_from_required_actions(user_vo)

mfa_user_id = user_vo.user_id

if user_mfa.get("state", "DISABLED") == "ENABLED" and params.auth_type != "MFA":
mfa_manager = MFAManager.get_manager_by_mfa_type(mfa_type)
if mfa_type == "EMAIL":
mfa_email = user_mfa["options"].get("email")
mfa_manager.send_mfa_authentication_email(
user_vo.user_id, domain_id, mfa_email, user_vo.language, credentials
)
mfa_user_id = mfa_email
if self._check_login_protocol_with_user_auth_type(params.auth_type, domain_id):
if user_mfa.get("state", "DISABLED") == "ENABLED" and params.auth_type != "MFA":
mfa_manager = MFAManager.get_manager_by_mfa_type(mfa_type)
if mfa_type == "EMAIL":
mfa_email = user_mfa["options"].get("email")
mfa_manager.send_mfa_authentication_email(
user_vo.user_id, domain_id, mfa_email, user_vo.language, credentials
)
mfa_user_id = mfa_email

elif mfa_type == "OTP":
secret_manager: SecretManager = self.locator.get_manager(SecretManager)
user_secret_id = user_mfa["options"].get("user_secret_id")
otp_secret_key = secret_manager.get_user_otp_secret_key(
user_secret_id, domain_id
)
elif mfa_type == "OTP":
secret_manager: SecretManager = self.locator.get_manager(SecretManager)
user_secret_id = user_mfa["options"].get("user_secret_id")
otp_secret_key = secret_manager.get_user_otp_secret_key(user_secret_id, domain_id)

mfa_manager.set_cache_otp_mfa_secret_key(
otp_secret_key, user_vo.user_id, domain_id, credentials
)
mfa_manager.set_cache_otp_mfa_secret_key(otp_secret_key, user_vo.user_id, domain_id, credentials)

raise ERROR_MFA_REQUIRED(user_id=mfa_user_id, mfa_type=mfa_type)
raise ERROR_MFA_REQUIRED(user_id=mfa_user_id, mfa_type=mfa_type)

token_info = token_mgr.issue_token(
private_jwk,
Expand Down Expand Up @@ -397,6 +396,17 @@ def _get_user_projects(

return user_projects

def _check_login_protocol_with_user_auth_type(self, user_auth_type: str, domain_id: str) -> bool:
if user_auth_type == "EXTERNAL":
domain: Domain = self.domain_mgr.get_domain(domain_id)
external_auth_mgr = ExternalAuthManager()
external_metadata_protocol = external_auth_mgr.get_auth_info(domain).get('metadata', {}).get('protocol')

if external_metadata_protocol == "saml":
return False

return True

@staticmethod
def _check_user_required_actions(required_actions: list, user_id: str) -> None:
if required_actions:
Expand Down
Loading