Skip to content

Commit

Permalink
protect last management group from deletion/demotion
Browse files Browse the repository at this point in the history
  • Loading branch information
felixrindt committed Aug 26, 2023
1 parent 47db966 commit 6742604
Show file tree
Hide file tree
Showing 15 changed files with 182 additions and 54 deletions.
16 changes: 10 additions & 6 deletions ephios/core/forms/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,14 @@ def __init__(self, **kwargs):
if (event := kwargs.get("instance", None)) is not None:
self.eventtype = event.type
responsible_users = get_users_with_perms(
event, only_with_perms_in=["change_event"], with_group_users=False
event, only_with_perms_in=["core.change_event"], with_group_users=False
)
responsible_groups = get_groups_with_perms(event, only_with_perms_in=["change_event"])
visible_for = get_groups_with_perms(event, only_with_perms_in=["view_event"]).exclude(
id__in=responsible_groups
responsible_groups = get_groups_with_perms(
event, only_with_perms_in=["core.change_event"]
)
visible_for = get_groups_with_perms(
event, only_with_perms_in=["core.view_event"]
).exclude(id__in=responsible_groups)

self.locked_visible_for_groups = set(visible_for.exclude(id__in=can_publish_for_groups))
kwargs["initial"] = {
Expand Down Expand Up @@ -119,11 +121,13 @@ def save(self, commit=True):
# delete existing permissions
# (better implement https://github.com/django-guardian/django-guardian/issues/654)
for group in get_groups_with_perms(
event, only_with_perms_in=["view_event", "change_event"]
event, only_with_perms_in=["core.view_event", "core.change_event"]
):
remove_perm("view_event", group, event)
remove_perm("change_event", group, event)
for user in get_users_with_perms(event, only_with_perms_in=["view_event", "change_event"]):
for user in get_users_with_perms(
event, only_with_perms_in=["core.view_event", "core.change_event"]
):
remove_perm("view_event", user, event)
remove_perm("change_event", user, event)

Expand Down
39 changes: 38 additions & 1 deletion ephios/core/forms/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from ephios.core.signals import register_group_permission_fields
from ephios.core.widgets import MultiUserProfileWidget
from ephios.extra.crispy import AbortLink
from ephios.extra.permissions import PermissionField, PermissionFormMixin
from ephios.extra.permissions import PermissionField, PermissionFormMixin, get_groups_with_perms
from ephios.extra.widgets import CustomDateInput
from ephios.modellogging.log import add_log_recorder
from ephios.modellogging.recorders import DerivedFieldsLogRecorder
Expand Down Expand Up @@ -176,6 +176,23 @@ def __init__(self, **kwargs):
),
)

def clean_is_management_group(self):
is_management_group = self.cleaned_data["is_management_group"]
if self.initial.get("is_management_group", False) and not is_management_group:
other_management_groups = get_groups_with_perms(
only_with_perms_in=CORE_MANAGEMENT_PERMISSIONS
).exclude(pk=self.instance.pk)
if not other_management_groups.exists():
raise ValidationError(
_(
"At least one group with management permissions must exist. "
"Please promote another group before demoting this one."
)
)
if is_management_group:
self.cleaned_data["is_hr_group"] = True
return is_management_group

def save(self, commit=True):
add_log_recorder(self.instance, DerivedFieldsLogRecorder(get_group_permission_log_fields))
group = super().save(commit)
Expand Down Expand Up @@ -296,6 +313,26 @@ def clean(self):
)


class DeleteGroupForm(Form):
def __init__(self, *args, **kwargs):
self.instance = kwargs.pop("instance")
super().__init__(*args, **kwargs)

def clean(self):
management_groups = get_groups_with_perms(only_with_perms_in=CORE_MANAGEMENT_PERMISSIONS)

if (
self.instance in management_groups
and not management_groups.exclude(pk=self.instance.pk).exists()
):
raise ValidationError(
_(
"At least one group with management permissions must exist. "
"Please promote another group before deleting this one."
)
)


class QualificationGrantForm(ModelForm):
model = QualificationGrant

Expand Down
2 changes: 1 addition & 1 deletion ephios/core/ical.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def item_guid(self, item):
return f"{item.pk}@{settings.GET_SITE_URL()}"

def item_organizer(self, item):
user = get_users_with_perms(item.event, only_with_perms_in=["change_event"]).first()
user = get_users_with_perms(item.event, only_with_perms_in=["core.change_event"]).first()
email = user.email if user else ""
return vCalAddress(f"MAILTO:{email}")

Expand Down
8 changes: 4 additions & 4 deletions ephios/core/services/notifications/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class NewEventNotification(AbstractNotificationHandler):
@classmethod
def send(cls, event: Event, **kwargs):
notifications = []
for user in get_users_with_perms(event, only_with_perms_in=["view_event"]):
for user in get_users_with_perms(event, only_with_perms_in=["core.view_event"]):
notifications.append(
Notification(slug=cls.slug, user=user, data={"event_id": event.id, **kwargs})
)
Expand Down Expand Up @@ -299,7 +299,7 @@ class ResponsibleMixin:
@classmethod
def _responsible_users(cls, participation: AbstractParticipation):
users = get_users_with_perms(
participation.shift.event, only_with_perms_in=["change_event"]
participation.shift.event, only_with_perms_in=["core.change_event"]
).distinct()
for user in users:
if (
Expand Down Expand Up @@ -435,7 +435,7 @@ def send(cls, event: Event):
pk__in=AbstractParticipation.objects.filter(shift__event=event).values_list(
"localparticipation__user", flat=True
)
).filter(pk__in=get_users_with_perms(event, only_with_perms_in=["view_event"]))
).filter(pk__in=get_users_with_perms(event, only_with_perms_in=["core.view_event"]))
notifications = []
for user in users_not_participating:
notifications.append(
Expand Down Expand Up @@ -472,7 +472,7 @@ class CustomEventParticipantNotification(AbstractNotificationHandler):
def send(cls, event: Event, content: str):
participants = set()
responsible_users = get_users_with_perms(
event, with_superusers=False, only_with_perms_in=["change_event"]
event, with_superusers=False, only_with_perms_in=["core.change_event"]
)
for shift in event.shifts.all():
participants.update(shift.get_participants())
Expand Down
2 changes: 2 additions & 0 deletions ephios/core/templates/core/group_confirm_delete.html
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{% extends "base.html" %}
{% load crispy_forms_filters %}
{% load i18n %}
{% load static %}

Expand All @@ -12,6 +13,7 @@ <h1>{% trans "Delete group" %}</h1>
</div>

<form method="post">{% csrf_token %}
{{ form|as_crispy_errors }}
<p>{% blocktrans %}Are you sure you want to delete the group "{{ group }}"?{% endblocktrans %}</p>
<a role="button" class="btn btn-secondary" href="{% url "core:group_list" %}">{% trans "Back" %}</a>
<button type="submit" class="btn btn-danger">{% trans "Delete" %}</button>
Expand Down
11 changes: 9 additions & 2 deletions ephios/core/views/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from ephios.api.access.auth import revoke_all_access_tokens
from ephios.core.forms.users import (
DeleteGroupForm,
DeleteUserProfileForm,
GroupForm,
QualificationGrantFormset,
Expand Down Expand Up @@ -145,8 +146,8 @@ def get_form_kwargs(self):
def get_success_url(self):
messages.info(
self.request,
_("The user {name} ({user}) was deleted.").format(
name=self.object.get_full_name(), user=self.object
_("The user {name} ({mail}) was deleted.").format(
name=self.object.get_full_name(), mail=self.object.email
),
)
return reverse("core:userprofile_list")
Expand Down Expand Up @@ -244,6 +245,12 @@ class GroupDeleteView(CustomPermissionRequiredMixin, DeleteView):
model = Group
permission_required = "auth.delete_group"
template_name = "core/group_confirm_delete.html"
form_class = DeleteGroupForm

def get_form_kwargs(self):
kwargs = super().get_form_kwargs()
kwargs["instance"] = self.object
return kwargs

def get_success_url(self):
messages.info(self.request, _('The group "{group}" was deleted.').format(group=self.object))
Expand Down
4 changes: 3 additions & 1 deletion ephios/core/views/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,9 @@ def form_valid(self, form):
assign_perm(
"change_event",
get_users_with_perms(
self.get_object(), only_with_perms_in=["change_event"], with_group_users=False
self.get_object(),
only_with_perms_in=["core.change_event"],
with_group_users=False,
),
event,
)
Expand Down
83 changes: 59 additions & 24 deletions ephios/extra/permissions.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
from functools import wraps
from typing import Iterable

from django.contrib.auth.models import Group, Permission
from django.core.exceptions import PermissionDenied
from django.db.models import Q
from django.forms import BooleanField
from guardian.ctypes import get_content_type
from guardian.shortcuts import assign_perm, remove_perm
from guardian.utils import get_group_obj_perms_model

Q_FALSE = Q(pk__in=[])


def staff_required(view_func):
@wraps(view_func)
Expand All @@ -22,29 +26,62 @@ def _wrapped_view(request, *args, **kwargs):
return _wrapped_view


def get_groups_with_perms(obj, only_with_perms_in):
ctype = get_content_type(obj)
group_model = get_group_obj_perms_model(obj)
def get_groups_with_perms(obj=None, only_with_perms_in=None, must_have_all_perms=True):
group_perms_model = get_group_obj_perms_model(obj)
group_perms_rel_name = group_perms_model.group.field.related_query_name()

qs = Group.objects.all()
required_perms = get_permissions_from_qualified_names(only_with_perms_in or [])

obj_filter = {}
if obj is not None:
ctype = get_content_type(obj)
if group_perms_model.objects.is_generic():
obj_filter = {
f"{group_perms_rel_name}__content_type": ctype,
f"{group_perms_rel_name}__object_pk": obj.pk,
}
else:
obj_filter = {f"{group_perms_rel_name}__content_object": obj}

if must_have_all_perms:
for perm in required_perms:
qs = qs.filter(
Q(
**{
f"{group_perms_rel_name}__permission": perm,
**obj_filter,
}
)
| Q(permissions=perm)
)
else:
qs = qs.filter(
Q(
**{
f"{group_perms_rel_name}__permission__in": required_perms,
**obj_filter,
}
)
| Q(permissions__in=required_perms)
)
return qs.distinct()


group_rel_name = group_model.group.field.related_query_name()
def get_groups_with_all_permissions_in(permission_list: Iterable[str]):
permissions = get_permissions_from_qualified_names(permission_list)
qs = Group.objects.all()
for permission in permissions:
qs = qs.filter(permissions=permission)
return qs

if group_model.objects.is_generic():
group_filters = {
f"{group_rel_name}__content_type": ctype,
f"{group_rel_name}__object_pk": obj.pk,
}
else:
group_filters = {f"{group_rel_name}__content_object": obj}

permission_ids = Permission.objects.filter(
content_type=ctype, codename__in=only_with_perms_in
).values_list("id", flat=True)
group_filters.update(
{
f"{group_rel_name}__permission_id__in": permission_ids,
}
)
return Group.objects.filter(**group_filters).distinct()
def get_permissions_from_qualified_names(qualified_names):
perms_filter = Q_FALSE
for qualified_name in qualified_names:
app_label, codename = qualified_name.split(".")
perms_filter = perms_filter | Q(content_type__app_label=app_label, codename=codename)
return Permission.objects.filter(perms_filter)


class PermissionField(BooleanField):
Expand All @@ -60,10 +97,8 @@ def __init__(self, *args, **kwargs):

def set_initial_value(self, user_or_group):
self.target = user_or_group
codename_set = set(map(lambda perm: perm.split(".")[-1], self.permission_set))
self.initial = codename_set <= set(
self.target.permissions.values_list("codename", flat=True)
)
wanted_permission_objects = get_permissions_from_qualified_names(self.permission_set)
self.initial = set(wanted_permission_objects) <= set(self.target.permissions.all())

def assign_permissions(self, target):
for permission in self.permission_set:
Expand Down
14 changes: 9 additions & 5 deletions tests/core/test_event_form.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ def test_create_event(django_app, planner, superuser, service_event_type, groups
event = shift_form.submit().follow().context["event"]
assert event.title == "Seeed Concert"
assert event.type == service_event_type
assert set(get_groups_with_perms(event, only_with_perms_in=["view_event"])) == {
assert set(get_groups_with_perms(event, only_with_perms_in=["core.view_event"])) == {
volunteers,
planners,
}
assert set(get_groups_with_perms(event, only_with_perms_in=["change_event"])) == {planners}
assert set(get_groups_with_perms(event, only_with_perms_in=["core.change_event"])) == {planners}
assert set(
get_users_with_perms(event, only_with_perms_in=["change_event"], with_group_users=False)
get_users_with_perms(
event, only_with_perms_in=["core.change_event"], with_group_users=False
)
) == {planner}
assert set(get_users_with_perms(event, only_with_perms_in=["change_event"])) == {
assert set(get_users_with_perms(event, only_with_perms_in=["core.change_event"])) == {
planner,
superuser,
} # superuser is in planner group
Expand Down Expand Up @@ -65,7 +67,9 @@ def test_add_responsible_user_to_event(django_app, planner, event, responsible_u
response.form["responsible_users"].force_value([responsible_user.id])
event = response.form.submit().follow().context["event"]
assert set(
get_users_with_perms(event, only_with_perms_in=["change_event"], with_group_users=False)
get_users_with_perms(
event, only_with_perms_in=["core.change_event"], with_group_users=False
)
) == {responsible_user}


Expand Down
8 changes: 4 additions & 4 deletions tests/core/test_event_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def test_mail_new_event(self, django_app, event, volunteer, planner, groups):
assert response.status_code == 302
assert (
Notification.objects.count()
== get_users_with_perms(event, only_with_perms_in=["view_event"]).count()
== get_users_with_perms(event, only_with_perms_in=["core.view_event"]).count()
)

def test_mail_reminder(self, django_app, event, volunteer, planner, groups):
Expand All @@ -29,7 +29,7 @@ def test_mail_reminder(self, django_app, event, volunteer, planner, groups):
pk__in=AbstractParticipation.objects.filter(shift__event=event).values_list(
"localparticipation__user", flat=True
)
).filter(pk__in=get_users_with_perms(event, only_with_perms_in=["view_event"]))
).filter(pk__in=get_users_with_perms(event, only_with_perms_in=["core.view_event"]))
assert Notification.objects.count() == users_not_participating.count()

def test_mail_participants(self, django_app, event, volunteer, planner, groups):
Expand All @@ -46,7 +46,7 @@ def test_mail_participants(self, django_app, event, volunteer, planner, groups):
response = form.submit()
assert response.status_code == 302
assert Notification.objects.count() == 1 + len(
get_users_with_perms(event, only_with_perms_in=["change_event"])
get_users_with_perms(event, only_with_perms_in=["core.change_event"])
)

def test_mail_participants_content_required(
Expand Down Expand Up @@ -75,6 +75,6 @@ def test_participants_mail_is_not_duplicated_for_participating_responsible(
form["mail_content"] = "hey Franz was here"
form.submit()
number_of_responsible_users = len(
get_users_with_perms(event, only_with_perms_in=["change_event"])
get_users_with_perms(event, only_with_perms_in=["core.change_event"])
)
assert Notification.objects.count() == number_of_responsible_users
4 changes: 2 additions & 2 deletions tests/core/test_notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_event_notification_sending(self, event, volunteer):
NewEventNotification.send(event)
EventReminderNotification.send(event)
assert Notification.objects.count() == 2 * len(
get_users_with_perms(event, only_with_perms_in=["view_event"])
get_users_with_perms(event, only_with_perms_in=["core.view_event"])
)
call_command("send_notifications")
assert Notification.objects.count() == 0
Expand All @@ -77,7 +77,7 @@ def test_participation_notification_sending(self, event, qualified_volunteer):
ParticipationRejectedNotification.send(participation)
ResponsibleParticipationRequestedNotification.send(participation)
assert Notification.objects.count() == 2 + len(
get_users_with_perms(event, only_with_perms_in=["change_event"])
get_users_with_perms(event, only_with_perms_in=["core.change_event"])
)
call_command("send_notifications")
assert Notification.objects.count() == 0
Expand Down
Loading

0 comments on commit 6742604

Please sign in to comment.