Skip to content

Commit

Permalink
[fix] Fixed validation in change device group admin action #762
Browse files Browse the repository at this point in the history
Closes #762
  • Loading branch information
pandafy authored Jun 7, 2023
1 parent 2d21f42 commit 6adafe3
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 18 deletions.
49 changes: 40 additions & 9 deletions openwisp_controller/config/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
ValidationError,
)
from django.http import Http404, HttpResponse, HttpResponseRedirect, JsonResponse
from django.http.response import HttpResponseForbidden
from django.shortcuts import get_object_or_404
from django.template.loader import get_template
from django.template.response import TemplateResponse
Expand Down Expand Up @@ -438,11 +439,20 @@ def get_fields(self, request, obj):

class ChangeDeviceGroupForm(forms.Form):
device_group = forms.ModelChoiceField(
queryset=DeviceGroup.objects.all(),
# The queryset is set in the __init__ method
# after filtering the groups according the
# device's organization
queryset=DeviceGroup.objects.none(),
label=_('Group'),
required=False,
)

def __init__(self, org_id, **kwargs):
super().__init__(**kwargs)
self.fields['device_group'].queryset = DeviceGroup.objects.filter(
organization_id=org_id
)


class DeviceAdmin(MultitenantAdminMixin, BaseConfigAdmin, UUIDAdmin):
recover_form_template = 'admin/config/device_recover_form.html'
Expand Down Expand Up @@ -559,28 +569,49 @@ def construct_change_message(self, request, form, formsets, add=False):
return super().construct_change_message(request, form, formsets, add)

def change_group(self, request, queryset):
# Validate all selected devices belong to the same organization
# which is managed by the user.
org_id = None
if queryset:
org_id = queryset[0].organization_id
if (
not request.user.is_superuser
and str(org_id) not in request.user.organizations_managed
):
logger.warning(f'{request.user} does not manage "{org_id}" organization.')
return HttpResponseForbidden()
if len(queryset) != queryset.filter(organization_id=org_id).count():
self.message_user(
request,
_('Select devices from one organization'),
messages.ERROR,
)
return HttpResponseRedirect(request.get_full_path())

if 'apply' in request.POST:
form = ChangeDeviceGroupForm(request.POST)
form = ChangeDeviceGroupForm(data=request.POST, org_id=org_id)
if form.is_valid():
group = form.cleaned_data['device_group']
instances, old_group_ids = map(
list, zip(*queryset.values_list('id', 'group'))
)
# Evaluate queryset to store old group id
old_group_qs = list(queryset)
queryset.update(group=group or None)
group_id = None
if group:
group_id = group.id
Device._send_device_group_changed_signal(
instance=instances, group_id=group_id, old_group_id=old_group_ids
)
for device in old_group_qs:
Device._send_device_group_changed_signal(
instance=device,
group_id=group_id,
old_group_id=device.group_id,
)
self.message_user(
request,
_('Successfully changed group of selected devices.'),
messages.SUCCESS,
)
return HttpResponseRedirect(request.get_full_path())

form = ChangeDeviceGroupForm()
form = ChangeDeviceGroupForm(org_id=org_id)
context = {
'title': _('Change group'),
'queryset': queryset,
Expand Down
61 changes: 52 additions & 9 deletions openwisp_controller/config/tests/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1727,40 +1727,83 @@ def test_group_templates_apply(self):

def test_change_device_group_action_changes_templates(self):
path = reverse(f'admin:{self.app_label}_device_changelist')
org = self._get_org(org_name='default')
org1 = self._create_org(name='org1', slug='org1')
org2 = self._create_org(name='org2', slug='org2')
t1 = self._create_template(name='t1')
t2 = self._create_template(name='t2')
dg1 = self._create_device_group(name='test-group-1', organization=org)
dg1 = self._create_device_group(name='test-group-1', organization=org1)
dg1.templates.add(t1)
dg2 = self._create_device_group(name='test-group-2', organization=org)
dg2 = self._create_device_group(name='test-group-2', organization=org1)
dg2.templates.add(t2)
device = self._create_device(organization=org, group=dg1)
templates = device.config.templates.all()
device1 = self._create_device(organization=org1, group=dg1)
device2 = self._create_device_config(
device_opts={'organization': org1, 'mac_address': '11:22:33:44:55:66'}
)
templates = device1.config.templates.all()
self.assertNotIn(t2, templates)
self.assertIn(t1, templates)
post_data = {
'_selected_action': [device.pk],
'_selected_action': [device1.pk],
'action': 'change_group',
'csrfmiddlewaretoken': 'test',
'apply': True,
}

with self.subTest('change group'):
post_data['device_group'] = str(dg2.pk)
post_data['apply'] = True
response = self.client.post(path, post_data, follow=True)
self.assertEqual(response.status_code, 200)
self.assertContains(
response, 'Successfully changed group of selected devices.'
)
templates = device.config.templates.all()
templates = device1.config.templates.all()
self.assertIn(t2, templates)
self.assertNotIn(t1, templates)

with self.subTest('unassign group'):
post_data['device_group'] = ''
response = self.client.post(path, post_data, follow=True)
self.assertEqual(response.status_code, 200)
templates = list(device.config.templates.all())
templates = list(device1.config.templates.all())
self.assertEqual(templates, [])

with self.subTest('Change group for multiple devices'):
data = post_data.copy()
data['_selected_action'] = [device1.pk, device2.pk]
data['device_group'] = str(dg2.pk)
with patch.object(Device, '_send_device_group_changed_signal') as mocked:
response = self.client.post(path, data, follow=True)
self.assertEqual(response.status_code, 200)
self.assertEqual(len(mocked.call_args_list), 2)

device2.organization = org2
device2.save()

with self.subTest('Select devices from different organization'):
data = post_data.copy()
data['_selected_action'] = [device1.pk, device2.pk]
response = self.client.post(path, data, follow=True)
self.assertContains(response, 'Select devices from one organization')

data.pop('apply')
data.pop('device_group')
response = self.client.post(path, data, follow=True)
self.assertContains(response, 'Select devices from one organization')

org_user = self._create_administrator(organizations=[org1])
self.client.force_login(org_user)

with self.subTest('Select devices from org not managed by user'):
data = post_data.copy()
data['_selected_action'] = [device2.pk]
response = self.client.post(path, data, follow=True)
self.assertEqual(response.status_code, 403)

data.pop('apply')
data.pop('device_group')
response = self.client.post(path, data, follow=True)
self.assertEqual(response.status_code, 403)

def test_change_device_group_changes_templates(self):
org = self._get_org(org_name='default')
t1 = self._create_template(name='t1')
Expand Down

0 comments on commit 6adafe3

Please sign in to comment.