diff --git a/src/sentry/api/endpoints/user_details.py b/src/sentry/api/endpoints/user_details.py index 0c76c4c2e91837..1f4a52f24c4393 100644 --- a/src/sentry/api/endpoints/user_details.py +++ b/src/sentry/api/endpoints/user_details.py @@ -13,13 +13,13 @@ from sentry import roles from sentry.api.api_publish_status import ApiPublishStatus from sentry.api.base import control_silo_endpoint -from sentry.api.bases.user import UserEndpoint +from sentry.api.bases.user import UserAndStaffPermission, UserEndpoint from sentry.api.decorators import sudo_required from sentry.api.endpoints.organization_details import post_org_pending_deletion from sentry.api.serializers import serialize from sentry.api.serializers.models.user import DetailedSelfUserSerializer from sentry.api.serializers.rest_framework import CamelSnakeModelSerializer -from sentry.auth.superuser import is_active_superuser +from sentry.auth.elevated_mode import has_elevated_mode from sentry.constants import LANGUAGES from sentry.models.options.user_option import UserOption from sentry.models.organization import OrganizationStatus @@ -154,6 +154,8 @@ class UserDetailsEndpoint(UserEndpoint): "PUT": ApiPublishStatus.UNKNOWN, } + permission_classes = (UserAndStaffPermission,) + def get(self, request: Request, user) -> Response: """ Retrieve User Details @@ -182,7 +184,13 @@ def put(self, request: Request, user) -> Response: :param string default_issue_event: Event displayed by default, "recommended", "latest" or "oldest" :auth: required """ - if not request.access.has_permission("users.admin"): + # We want to prevent superusers from setting users to superuser or staff + # because this is only done through _admin. This will always be enforced + # once the feature flag is removed. + can_elevate_user = has_elevated_mode(request) and request.access.has_permission( + "users.admin" + ) + if not can_elevate_user: if not user.is_superuser and request.data.get("isSuperuser"): return Response( {"detail": "Missing required permission to add superuser."}, @@ -194,11 +202,13 @@ def put(self, request: Request, user) -> Response: status=status.HTTP_403_FORBIDDEN, ) - if request.access.has_permission("users.admin"): + if can_elevate_user: serializer_cls = PrivilegedUserSerializer - # with superuser read write separation, superuser read cannot hit this endpoint - # so we can keep this as is_active_superuser - elif is_active_superuser(request): + # With superuser read/write separation, superuser read cannot hit this endpoint + # so we can keep this as is_active_superuser. Once the feature flag is + # removed and we only check is_active_staff, we can remove this comment. + elif has_elevated_mode(request): + # TODO(schew2381): Rename to staff serializer serializer_cls = SuperuserUserSerializer else: serializer_cls = UserSerializer @@ -257,7 +267,6 @@ def delete(self, request: Request, user) -> Response: :param list organizations: List of organization ids to remove :auth required: """ - serializer = DeleteUserSerializer(data=request.data) if not serializer.is_valid(): @@ -328,9 +337,12 @@ def delete(self, request: Request, user) -> Response: } hard_delete = serializer.validated_data.get("hardDelete", False) + can_delete = has_elevated_mode(request) and request.access.has_permission("users.admin") # Only active superusers can hard delete accounts - if hard_delete and not request.access.has_permission("users.admin"): + # This will be changed to only active staff can delete accounts once the + # staff feature flag is removed. + if hard_delete and not can_delete: return Response( {"detail": "Missing required permission to hard delete account."}, status=status.HTTP_403_FORBIDDEN, diff --git a/src/sentry/api/serializers/models/user.py b/src/sentry/api/serializers/models/user.py index 93099b4c796f1c..99a02ac8add0fc 100644 --- a/src/sentry/api/serializers/models/user.py +++ b/src/sentry/api/serializers/models/user.py @@ -15,7 +15,7 @@ from sentry.api.serializers import Serializer, register from sentry.api.serializers.types import SerializedAvatarFields from sentry.app import env -from sentry.auth.superuser import is_active_superuser +from sentry.auth.elevated_mode import has_elevated_mode from sentry.models.authenticator import Authenticator from sentry.models.authidentity import AuthIdentity from sentry.models.avatars.user_avatar import UserAvatar @@ -122,7 +122,8 @@ def _user_is_requester(self, obj: User, requester: User | AnonymousUser | RpcUse def _get_identities( self, item_list: Sequence[User], user: User ) -> dict[int, list[AuthIdentity]]: - if not (env.request and is_active_superuser(env.request)): + + if not (env.request and has_elevated_mode(env.request)): item_list = [x for x in item_list if x.id == user.id] queryset = AuthIdentity.objects.filter( @@ -290,8 +291,10 @@ def serialize( ) -> DetailedUserSerializerResponse: d = cast(DetailedUserSerializerResponse, super().serialize(obj, attrs, user)) - # XXX(dcramer): we don't use is_active_superuser here as we simply - # want to tell the UI that we're an authenticated superuser, and + # TODO(schew2381): Remove mention of superuser below once the staff feature flag is removed + + # XXX(dcramer): we don't check for active superuser/staff here as we simply + # want to tell the UI that we're an authenticated superuser/staff, and # for requests that require an *active* session, they should prompt # on-demand. This ensures things like links to the Sentry admin can # still easily be rendered. @@ -357,9 +360,10 @@ def serialize( d = cast(DetailedSelfUserSerializerResponse, super().serialize(obj, attrs, user)) # safety check to never return this information if the acting user is not 1) this user, 2) an admin - if user.id == obj.id or user.is_superuser: - # XXX(dcramer): we don't use is_active_superuser here as we simply - # want to tell the UI that we're an authenticated superuser, and + # TODO(schew2381): Remove user.is_superuser once the staff feature flag is removed + if user.id == obj.id or user.is_superuser or user.is_staff: + # XXX(dcramer): we don't check for active superuser/staff here as we simply + # want to tell the UI that we're an authenticated superuser/staff, and # for requests that require an *active* session, they should prompt # on-demand. This ensures things like links to the Sentry admin can # still easily be rendered. diff --git a/tests/sentry/api/endpoints/test_user_details.py b/tests/sentry/api/endpoints/test_user_details.py index 13aaa17c08d925..8ba1536c740cf3 100644 --- a/tests/sentry/api/endpoints/test_user_details.py +++ b/tests/sentry/api/endpoints/test_user_details.py @@ -1,4 +1,5 @@ from django.test import override_settings +from pytest import fixture from sentry.models.deletedorganization import DeletedOrganization from sentry.models.options.user_option import UserOption @@ -10,6 +11,7 @@ from sentry.silo.base import SiloMode from sentry.tasks.deletion.hybrid_cloud import schedule_hybrid_cloud_foreign_key_jobs from sentry.testutils.cases import APITestCase +from sentry.testutils.helpers import Feature from sentry.testutils.helpers.features import with_feature from sentry.testutils.hybrid_cloud import HybridCloudTestMixin from sentry.testutils.outbox import outbox_runner @@ -22,6 +24,8 @@ class UserDetailsTest(APITestCase): def setUp(self): super().setUp() self.user = self.create_user(email="a@example.com", is_managed=False, name="example name") + self.superuser = self.create_user(is_superuser=True) + self.staff_user = self.create_user(is_staff=True) self.login_as(user=self.user) @@ -44,9 +48,8 @@ def test_lookup_self(self): assert not resp.data["options"]["clock24Hours"] assert not resp.data["options"]["issueDetailsNewExperienceQ42023"] - def test_superuser(self): - superuser = self.create_user(email="b@example.com", is_superuser=True) - self.login_as(user=superuser, superuser=True) + def test_superuser_simple(self): + self.login_as(user=self.superuser, superuser=True) resp = self.get_success_response(self.user.id) @@ -54,21 +57,46 @@ def test_superuser(self): assert "identities" in resp.data assert len(resp.data["identities"]) == 0 - def test_includes_roles_and_permissions(self): - superuser = self.create_user(email="b@example.com", is_superuser=True) - self.add_user_permission(superuser, "users.admin") - self.login_as(user=superuser, superuser=True) + @with_feature("auth:enterprise-staff-cookie") + def test_staff_simple(self): + self.login_as(user=self.staff_user, staff=True) + + resp = self.get_success_response(self.user.id) + + assert resp.data["id"] == str(self.user.id) + assert "identities" in resp.data + assert len(resp.data["identities"]) == 0 - resp = self.get_success_response(superuser.id) + def test_superuser_includes_roles_and_permissions(self): + self.add_user_permission(self.superuser, "users.admin") + self.login_as(user=self.superuser, superuser=True) - assert resp.data["id"] == str(superuser.id) + resp = self.get_success_response(self.superuser.id) + + assert resp.data["id"] == str(self.superuser.id) assert "permissions" in resp.data assert resp.data["permissions"] == ["users.admin"] role = UserRole.objects.create(name="test", permissions=["broadcasts.admin"]) - role.users.add(superuser) + role.users.add(self.superuser) - resp = self.get_success_response(superuser.id) + resp = self.get_success_response(self.superuser.id) + assert resp.data["permissions"] == ["broadcasts.admin", "users.admin"] + + def test_staff_includes_roles_and_permissions(self): + self.add_user_permission(self.staff_user, "users.admin") + self.login_as(user=self.staff_user, staff=True) + + resp = self.get_success_response(self.staff_user.id) + + assert resp.data["id"] == str(self.staff_user.id) + assert "permissions" in resp.data + assert resp.data["permissions"] == ["users.admin"] + + role = UserRole.objects.create(name="test", permissions=["broadcasts.admin"]) + role.users.add(self.staff_user) + + resp = self.get_success_response(self.staff_user.id) assert resp.data["permissions"] == ["broadcasts.admin", "users.admin"] @@ -165,8 +193,7 @@ class UserDetailsSuperuserUpdateTest(UserDetailsTest): def test_superuser_can_change_is_active(self): self.user.update(is_active=True) - superuser = self.create_user(email="b@example.com", is_superuser=True) - self.login_as(user=superuser, superuser=True) + self.login_as(user=self.superuser, superuser=True) resp = self.get_success_response( self.user.id, @@ -179,9 +206,8 @@ def test_superuser_can_change_is_active(self): def test_superuser_with_permission_can_change_is_active(self): self.user.update(is_active=True) - superuser = self.create_user(email="b@example.com", is_superuser=True) - UserPermission.objects.create(user=superuser, permission="users.admin") - self.login_as(user=superuser, superuser=True) + UserPermission.objects.create(user=self.superuser, permission="users.admin") + self.login_as(user=self.superuser, superuser=True) resp = self.get_success_response( self.user.id, @@ -227,8 +253,7 @@ def test_superuser_write_can_change_is_active(self): def test_superuser_cannot_add_superuser(self): self.user.update(is_superuser=False) - superuser = self.create_user(email="b@example.com", is_superuser=True) - self.login_as(user=superuser, superuser=True) + self.login_as(user=self.superuser, superuser=True) resp = self.get_error_response( self.user.id, @@ -242,8 +267,7 @@ def test_superuser_cannot_add_superuser(self): def test_superuser_cannot_add_staff(self): self.user.update(is_staff=False) - superuser = self.create_user(email="b@example.com", is_superuser=True) - self.login_as(user=superuser, superuser=True) + self.login_as(user=self.superuser, superuser=True) resp = self.get_error_response( self.user.id, @@ -257,9 +281,8 @@ def test_superuser_cannot_add_staff(self): def test_superuser_with_permission_can_add_superuser(self): self.user.update(is_superuser=False) - superuser = self.create_user(email="b@example.com", is_superuser=True) - UserPermission.objects.create(user=superuser, permission="users.admin") - self.login_as(user=superuser, superuser=True) + UserPermission.objects.create(user=self.superuser, permission="users.admin") + self.login_as(user=self.superuser, superuser=True) resp = self.get_success_response( self.user.id, @@ -272,9 +295,127 @@ def test_superuser_with_permission_can_add_superuser(self): def test_superuser_with_permission_can_add_staff(self): self.user.update(is_staff=False) - superuser = self.create_user(email="b@example.com", is_superuser=True) - UserPermission.objects.create(user=superuser, permission="users.admin") - self.login_as(user=superuser, superuser=True) + UserPermission.objects.create(user=self.superuser, permission="users.admin") + self.login_as(user=self.superuser, superuser=True) + + resp = self.get_success_response( + self.user.id, + isStaff="true", + ) + assert resp.data["id"] == str(self.user.id) + + user = User.objects.get(id=self.user.id) + assert user.is_staff + + +@control_silo_test +class UserDetailsStaffUpdateTest(UserDetailsTest): + method = "put" + + @fixture(autouse=True) + def use_staff_feature_flag(self): + with Feature("auth:enterprise-staff-cookie"): + yield + + def test_staff_can_change_is_active(self): + self.user.update(is_active=True) + self.login_as(user=self.staff_user, staff=True) + + resp = self.get_success_response( + self.user.id, + isActive="false", + ) + assert resp.data["id"] == str(self.user.id) + + user = User.objects.get(id=self.user.id) + assert not user.is_active + + def test_staff_with_permission_can_change_is_active(self): + self.user.update(is_active=True) + UserPermission.objects.create(user=self.staff_user, permission="users.admin") + self.login_as(user=self.staff_user, staff=True) + + resp = self.get_success_response( + self.user.id, + isActive="false", + ) + assert resp.data["id"] == str(self.user.id) + + user = User.objects.get(id=self.user.id) + assert not user.is_active + + def test_staff_cannot_add_superuser(self): + self.user.update(is_superuser=False) + self.login_as(user=self.staff_user, staff=True) + + resp = self.get_error_response( + self.user.id, + isSuperuser="true", + status_code=403, + ) + assert resp.data["detail"] == "Missing required permission to add superuser." + + user = User.objects.get(id=self.user.id) + assert not user.is_superuser + + def test_staff_cannot_add_staff(self): + self.user.update(is_staff=False) + + self.login_as(user=self.staff_user, staff=True) + + resp = self.get_error_response( + self.user.id, + isStaff="true", + status_code=403, + ) + assert resp.data["detail"] == "Missing required permission to add admin." + + user = User.objects.get(id=self.user.id) + assert not user.is_staff + + def test_superuser_cannot_add_superuser_or_staff_with_feature_flag(self): + self.user.update(is_staff=False) + + self.login_as(user=self.superuser, superuser=True) + + resp = self.get_error_response( + self.user.id, + isStaff="true", + status_code=403, + ) + assert resp.data["detail"] == "Missing required permission to add admin." + + resp = self.get_error_response( + self.user.id, + isSuperuser="true", + status_code=403, + ) + assert resp.data["detail"] == "Missing required permission to add superuser." + + user = User.objects.get(id=self.user.id) + assert not user.is_staff + assert not user.is_superuser + + def test_staff_with_permission_can_add_superuser(self): + self.user.update(is_superuser=False) + + UserPermission.objects.create(user=self.staff_user, permission="users.admin") + self.login_as(user=self.staff_user, staff=True) + + resp = self.get_success_response( + self.user.id, + isSuperuser="true", + ) + assert resp.data["id"] == str(self.user.id) + + user = User.objects.get(id=self.user.id) + assert user.is_superuser + + def test_staff_with_permission_can_add_staff(self): + self.user.update(is_staff=False) + + UserPermission.objects.create(user=self.staff_user, permission="users.admin") + self.login_as(user=self.staff_user, staff=True) resp = self.get_success_response( self.user.id, @@ -391,31 +532,65 @@ def test_cannot_hard_delete_self(self): # Cannot hard delete your own account self.get_error_response(self.user.id, hardDelete=True, organizations=[], status_code=403) - def test_hard_delete_account_without_permission(self): - self.user.update(is_superuser=True) + def test_superuser_hard_delete_account_without_permission(self): + self.login_as(user=self.superuser, superuser=True) user2 = self.create_user(email="user2@example.com") - # failed authorization, user does not have permissions to delete another user - self.get_error_response(user2.id, hardDelete=True, organizations=[], status_code=403) + # failed authorization, user does not have users.admin permission to hard delete another user + response = self.get_error_response( + user2.id, hardDelete=True, organizations=[], status_code=403 + ) - # Reauthenticate as super user to hard delete an account - self.login_as(user=self.user, superuser=True) + assert response.data["detail"] == "Missing required permission to hard delete account." + assert User.objects.filter(id=user2.id).exists() - self.get_error_response(user2.id, hardDelete=True, organizations=[], status_code=403) + @with_feature("auth:enterprise-staff-cookie") + def test_staff_hard_delete_account_without_permission(self): + self.login_as(user=self.staff_user, staff=True) + user2 = self.create_user(email="user2@example.com") + + # failed authorization, user does not have users.admin permission to hard delete another user + response = self.get_error_response( + user2.id, hardDelete=True, organizations=[], status_code=403 + ) + assert response.data["detail"] == "Missing required permission to hard delete account." assert User.objects.filter(id=user2.id).exists() - def test_hard_delete_account_with_permission(self): - self.user.update(is_superuser=True) + def test_superuser_hard_delete_account_with_permission(self): + self.login_as(user=self.superuser, superuser=True) user2 = self.create_user(email="user2@example.com") - # failed authorization, user does not have permissions to delete another user - self.get_error_response(user2.id, hardDelete=True, organizations=[], status_code=403) - - # Reauthenticate as super user to hard delete an account - UserPermission.objects.create(user=self.user, permission="users.admin") - self.login_as(user=self.user, superuser=True) + # Add users.admin permission to superuser + UserPermission.objects.create(user=self.superuser, permission="users.admin") self.get_success_response(user2.id, hardDelete=True, organizations=[], status_code=204) + assert not User.objects.filter(id=user2.id).exists() + + @with_feature("auth:enterprise-staff-cookie") + def test_staff_hard_delete_account_with_permission(self): + self.login_as(user=self.staff_user, staff=True) + user2 = self.create_user(email="user2@example.com") + # Add users.admin permission to staff + UserPermission.objects.create(user=self.staff_user, permission="users.admin") + + self.get_success_response(user2.id, hardDelete=True, organizations=[], status_code=204) assert not User.objects.filter(id=user2.id).exists() + + @with_feature("auth:enterprise-staff-cookie") + def test_superuser_cannot_hard_delete_with_active_feature_flag(self): + self.login_as(user=self.superuser, superuser=True) + user2 = self.create_user(email="user2@example.com") + + # Add users.admin permission to superuser + UserPermission.objects.create(user=self.superuser, permission="users.admin") + + # Superusers will eventually be prevented from hard deleting accounts + # once the feature flag is removed + response = self.get_error_response( + user2.id, hardDelete=True, organizations=[], status_code=403 + ) + + assert response.data["detail"] == "Missing required permission to hard delete account." + assert User.objects.filter(id=user2.id).exists() diff --git a/tests/sentry/api/serializers/test_user.py b/tests/sentry/api/serializers/test_user.py index 488735017ab590..10e77bdb457e41 100644 --- a/tests/sentry/api/serializers/test_user.py +++ b/tests/sentry/api/serializers/test_user.py @@ -101,31 +101,34 @@ def test_with_avatar(self): @control_silo_test class DetailedSelfUserSerializerTest(TestCase): - def test_simple(self): - user = self.create_user() - UserPermission.objects.create(user=user, permission="foo") + def setUp(self): + super().setUp() + self.user = self.create_user() + UserPermission.objects.create(user=self.user, permission="foo") - org = self.create_organization(owner=user) + org = self.create_organization(owner=self.user) auth_provider = AuthProvider.objects.create(organization_id=org.id, provider="dummy") - auth_identity = AuthIdentity.objects.create( - auth_provider=auth_provider, ident=user.email, user=user + self.auth_identity = AuthIdentity.objects.create( + auth_provider=auth_provider, ident=self.user.email, user=self.user ) - auth = Authenticator.objects.create( - type=available_authenticators(ignore_backup=True)[0].type, user=user + self.auth = Authenticator.objects.create( + type=available_authenticators(ignore_backup=True)[0].type, user=self.user ) - result = serialize(user, user, DetailedSelfUserSerializer()) - assert result["id"] == str(user.id) + def test_simple(self): + result = serialize(self.user, self.user, DetailedSelfUserSerializer()) + + assert result["id"] == str(self.user.id) assert result["has2fa"] is True assert len(result["emails"]) == 1 - assert result["emails"][0]["email"] == user.email + assert result["emails"][0]["email"] == self.user.email assert result["emails"][0]["is_verified"] assert "identities" in result assert len(result["identities"]) == 1 - assert result["identities"][0]["id"] == str(auth_identity.id) - assert result["identities"][0]["name"] == auth_identity.ident + assert result["identities"][0]["id"] == str(self.auth_identity.id) + assert result["identities"][0]["name"] == self.auth_identity.ident assert "authenticators" in result assert len(result["authenticators"]) == 1 - assert result["authenticators"][0]["id"] == str(auth.id) + assert result["authenticators"][0]["id"] == str(self.auth.id) assert result["permissions"] == ["foo"]