diff --git a/kobo/apps/accounts/tests/test_backend.py b/kobo/apps/accounts/tests/test_backend.py index 1271d01798..c7ef6dc0eb 100644 --- a/kobo/apps/accounts/tests/test_backend.py +++ b/kobo/apps/accounts/tests/test_backend.py @@ -1,16 +1,17 @@ import json -from mock import patch import responses from allauth.socialaccount.models import SocialAccount, SocialApp from django.conf import settings -from django.test.utils import override_settings from django.test import TestCase +from django.test.utils import override_settings from django.urls import reverse +from mock import patch from rest_framework import status -from kobo.apps.openrosa.apps.main.models import UserProfile +from kobo.apps.audit_log.models import AuditAction, AuditLog from kobo.apps.kobo_auth.shortcuts import User +from kobo.apps.openrosa.apps.main.models import UserProfile from .constants import SOCIALACCOUNT_PROVIDERS @@ -98,4 +99,7 @@ def test_keep_django_auth_backend_with_sso(self, mock_verify_and_unstash_state): self.assertRedirects(response, reverse(settings.LOGIN_REDIRECT_URL)) self.assertTrue(response.wsgi_request.user.is_authenticated) + # Ensure there is a record of the login + audit_log: AuditLog = AuditLog.objects.filter(user=response.wsgi_request.user).first() + self.assertEquals(audit_log.action, AuditAction.AUTH) assert response.wsgi_request.user.backend == settings.AUTHENTICATION_BACKENDS[0] diff --git a/kobo/apps/audit_log/models.py b/kobo/apps/audit_log/models.py index 279378336a..350122a71f 100644 --- a/kobo/apps/audit_log/models.py +++ b/kobo/apps/audit_log/models.py @@ -1,8 +1,21 @@ +import logging + +from django.conf import settings +from django.contrib.auth.models import AnonymousUser from django.contrib.contenttypes.models import ContentType from django.db import models -from django.conf import settings from django.utils.timezone import now +from kobo.apps.kobo_auth.shortcuts import User +from kobo.apps.openrosa.libs.utils.viewer_tools import ( + get_client_ip, + get_human_readable_client_user_agent, +) +from kpi.constants import ( + ACCESS_LOG_KOBO_AUTH_APP_LABEL, + ACCESS_LOG_LOGINAS_AUTH_TYPE, + ACCESS_LOG_UNKNOWN_AUTH_TYPE, +) from kpi.fields.kpi_uid import UUID_LENGTH @@ -60,3 +73,51 @@ def save( using=using, update_fields=update_fields, ) + + @staticmethod + def create_access_log_for_request(request, user=None, authentication_type: str = None): + logged_in_user = user or request.user + + # django-loginas will keep the superuser as the _cached_user while request.user is set to the new one + # sometimes there won't be a cached user at all, mostly in tests + initial_user = getattr(request, '_cached_user', None) + is_loginas_url = request.resolver_match is not None and request.resolver_match.url_name == 'loginas-user-login' + # a regular login may have an anonymous user as _cached_user, ignore that + user_changed = initial_user and initial_user.is_authenticated and initial_user.id != logged_in_user.id + is_loginas = is_loginas_url and user_changed + if authentication_type and authentication_type != '': + # authentication_type parameter has precedence + auth_type = authentication_type + elif is_loginas: + # second option: loginas + auth_type = ACCESS_LOG_LOGINAS_AUTH_TYPE + elif hasattr(logged_in_user, 'backend') and logged_in_user.backend is not None: + # third option: the backend that authenticated the user + auth_type = logged_in_user.backend + else: + # default: unknown + auth_type = ACCESS_LOG_UNKNOWN_AUTH_TYPE + + # gather information about the source of the request + ip = get_client_ip(request) + source = get_human_readable_client_user_agent(request) + metadata = { + 'ip_address': ip, + 'source': source, + 'auth_type': auth_type, + } + + # add extra information if needed for django-loginas + if is_loginas: + metadata['initial_user_uid'] = initial_user.extra_details.uid + metadata['initial_user_username'] = initial_user.username + audit_log = AuditLog( + user=logged_in_user, + app_label=ACCESS_LOG_KOBO_AUTH_APP_LABEL, + model_name=User.__qualname__, + object_id=logged_in_user.id, + user_uid=logged_in_user.extra_details.uid, + action=AuditAction.AUTH, + metadata=metadata, + ) + return audit_log diff --git a/kobo/apps/audit_log/signals.py b/kobo/apps/audit_log/signals.py index e69de29bb2..17ba9047fb 100644 --- a/kobo/apps/audit_log/signals.py +++ b/kobo/apps/audit_log/signals.py @@ -0,0 +1,17 @@ +from django.contrib.auth.signals import user_logged_in +from django.dispatch import receiver + +from kpi.utils.log import logging +from .models import AuditLog + + +@receiver(user_logged_in) +def create_access_log(sender, user, **kwargs): + request = kwargs['request'] + if not hasattr(request, 'user'): + # This should never happen outside of tests + logging.warning('Request does not have authenticated user attached.') + log = AuditLog.create_access_log_for_request(request, user) + else: + log = AuditLog.create_access_log_for_request(request) + log.save() diff --git a/kobo/apps/audit_log/tests/api/v2/test_api_audit_log.py b/kobo/apps/audit_log/tests/api/v2/test_api_audit_log.py index 1a940e0538..c9ac1dd0f0 100644 --- a/kobo/apps/audit_log/tests/api/v2/test_api_audit_log.py +++ b/kobo/apps/audit_log/tests/api/v2/test_api_audit_log.py @@ -4,6 +4,7 @@ from rest_framework.reverse import reverse from kobo.apps.audit_log.models import AuditAction, AuditLog +from kobo.apps.audit_log.tests.test_signals import skip_login_access_log from kpi.tests.base_test_case import BaseTestCase from kpi.urls.router_api_v2 import URL_NAMESPACE as ROUTER_URL_NAMESPACE @@ -48,17 +49,20 @@ def test_list_as_superuser(self): date_created=date_created, action=AuditAction.DELETE ) - self.client.login(username='admin', password='pass') - expected = [{ - 'app_label': 'foo', - 'model_name': 'bar', - 'object_id': 1, - 'user': 'http://testserver/api/v2/users/someuser/', - 'user_uid': someuser.extra_details.uid, - 'action': 'DELETE', - 'metadata': {}, - 'date_created': date_created, - }] + with skip_login_access_log(): + self.client.login(username='admin', password='pass') + expected = [ + { + 'app_label': 'foo', + 'model_name': 'bar', + 'object_id': 1, + 'user': 'http://testserver/api/v2/users/someuser/', + 'user_uid': someuser.extra_details.uid, + 'action': 'DELETE', + 'metadata': {}, + 'date_created': date_created, + }, + ] response = self.client.get(self.audit_log_list_url) audit_logs_count = AuditLog.objects.count() assert response.status_code == status.HTTP_200_OK @@ -85,7 +89,8 @@ def test_filter_list(self): date_created=date_created, action=AuditAction.DELETE, ) - self.client.login(username='admin', password='pass') + with skip_login_access_log(): + self.client.login(username='admin', password='pass') expected = [{ 'app_label': 'foo', 'model_name': 'bar', diff --git a/kobo/apps/audit_log/tests/test_models.py b/kobo/apps/audit_log/tests/test_models.py new file mode 100644 index 0000000000..d87e99a3b8 --- /dev/null +++ b/kobo/apps/audit_log/tests/test_models.py @@ -0,0 +1,121 @@ +from unittest.mock import patch + +from django.contrib.auth.models import AnonymousUser +from django.test.client import RequestFactory +from django.urls import resolve, reverse + +from kobo.apps.audit_log.models import ( + ACCESS_LOG_KOBO_AUTH_APP_LABEL, + ACCESS_LOG_LOGINAS_AUTH_TYPE, + ACCESS_LOG_UNKNOWN_AUTH_TYPE, + AuditAction, + AuditLog, +) +from kobo.apps.kobo_auth.shortcuts import User +from kpi.tests.base_test_case import BaseTestCase + + +@patch('kobo.apps.audit_log.models.get_human_readable_client_user_agent', return_value='source') +@patch('kobo.apps.audit_log.models.get_client_ip', return_value='127.0.0.1') +class AuditLogModelTestCase(BaseTestCase): + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.super_user = User.objects.create_user( + 'user', 'user@example.com', 'userpass' + ) + cls.super_user.is_super = True + cls.super_user.backend = 'django.contrib.auth.backends.ModelBackend' + cls.super_user.save() + + def _create_request(self, url: str, cached_user, new_user): + factory = RequestFactory() + request = factory.post(url) + request.user = new_user + request._cached_user = cached_user + request.resolver_match = resolve(url) + return request + + def _check_common_fields(self, audit_log: AuditLog, user): + self.assertEqual(audit_log.user.id, user.id) + self.assertEqual(audit_log.app_label, ACCESS_LOG_KOBO_AUTH_APP_LABEL) + self.assertEqual(audit_log.model_name, 'User') + self.assertEqual(audit_log.object_id, user.id) + self.assertEqual(audit_log.user_uid, user.extra_details.uid) + self.assertEqual(audit_log.action, AuditAction.AUTH) + + def test_basic_create_auth_log_from_request(self, patched_ip, patched_source): + request = self._create_request( + reverse('kobo_login'), AnonymousUser(), AuditLogModelTestCase.super_user + ) + log: AuditLog = AuditLog.create_access_log_for_request(request) + self._check_common_fields(log, AuditLogModelTestCase.super_user) + self.assertDictEqual( + log.metadata, + { + 'ip_address': '127.0.0.1', + 'source': 'source', + 'auth_type': AuditLogModelTestCase.super_user.backend, + }, + ) + + def test_create_auth_log_from_loginas_request(self, patched_ip, patched_source): + second_user = User.objects.create_user( + 'second_user', 'second@example.com', 'pass' + ) + second_user.save() + request = self._create_request( + reverse('loginas-user-login', args=(second_user.id,)), + AuditLogModelTestCase.super_user, + second_user, + ) + log: AuditLog = AuditLog.create_access_log_for_request(request) + self._check_common_fields(log, second_user) + self.assertDictEqual( + log.metadata, + { + 'ip_address': '127.0.0.1', + 'source': 'source', + 'auth_type': ACCESS_LOG_LOGINAS_AUTH_TYPE, + 'initial_user_uid': AuditLogModelTestCase.super_user.extra_details.uid, + 'initial_user_username': AuditLogModelTestCase.super_user.username, + }, + ) + + def test_create_auth_log_with_different_auth_type(self, patched_ip, patched_source): + request = self._create_request( + reverse('api_v2:asset-list'), AnonymousUser(), AuditLogModelTestCase.super_user + ) + log: AuditLog = AuditLog.create_access_log_for_request( + request, authentication_type='Token' + ) + self._check_common_fields(log, AuditLogModelTestCase.super_user) + self.assertDictEqual( + log.metadata, + { + 'ip_address': '127.0.0.1', + 'source': 'source', + 'auth_type': 'Token', + }, + ) + + def test_create_auth_log_unknown_authenticator(self, patched_ip, patched_source): + # no backend attached to the user object + second_user = User.objects.create_user( + 'second_user', 'second@example.com', 'pass' + ) + second_user.save() + request = self._create_request( + reverse('api_v2:asset-list'), AuditLogModelTestCase.super_user, second_user + ) + log: AuditLog = AuditLog.create_access_log_for_request(request) + self._check_common_fields(log, second_user) + self.assertDictEqual( + log.metadata, + { + 'ip_address': '127.0.0.1', + 'source': 'source', + 'auth_type': ACCESS_LOG_UNKNOWN_AUTH_TYPE, + }, + ) diff --git a/kobo/apps/audit_log/tests/test_signals.py b/kobo/apps/audit_log/tests/test_signals.py new file mode 100644 index 0000000000..1524fd0454 --- /dev/null +++ b/kobo/apps/audit_log/tests/test_signals.py @@ -0,0 +1,141 @@ +import contextlib +from unittest.mock import patch + +from allauth.account.models import EmailAddress +from django.contrib.auth.signals import user_logged_in +from django.test import override_settings +from django.urls import resolve, reverse +from trench.utils import get_mfa_model + +from kobo.apps.audit_log.models import AuditAction, AuditLog +from kobo.apps.audit_log.signals import create_access_log +from kobo.apps.kobo_auth.shortcuts import User +from kpi.tests.base_test_case import BaseTestCase + + +@contextlib.contextmanager +def skip_login_access_log(): + """ + Context manager for skipping the creation of an access log on login + + Disconnects the method that creates access logs from the user_logged_in signal within the contextmanager block. + Useful when you want full control over the audit logs produced in a test. + """ + user_logged_in.disconnect(create_access_log) + yield + user_logged_in.connect(create_access_log) + + +class AuditLogSignalsTestCase(BaseTestCase): + """ + Class for testing that logins produce AuditLogs. + + Here we just test that AuditLogs are produced, not necessarily what they contain. More tests for what they contain + are in test_models.py. Also, AuditLogs for SSO logins are tested as part of the SSO tests + to avoid copying lots of complicated setup. + """ + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.user = User.objects.create_user( + 'user', 'user@example.com', 'pass' + ) + cls.user.backend = 'django.contrib.auth.backends.ModelBackend' + cls.user.save() + + @patch('kobo.apps.audit_log.signals.AuditLog.create_access_log_for_request') + def test_audit_log_created_on_login(self, patched_create): + """ + Basic plumbing test to make sure the signal is hooked up + """ + self.client.login(username='user', password='pass') + patched_create.assert_called_once() + + @override_settings(ACCOUNT_EMAIL_VERIFICATION='none') + def test_simple_login(self): + count = AuditLog.objects.count() + self.assertEqual(count, 0) + user = AuditLogSignalsTestCase.user + data = { + 'login': 'user', + 'password': 'pass', + } + response = self.client.post( + reverse('kobo_login'), data=data, follow=True + ) + audit_log = AuditLog.objects.first() + # AuditLogs are tested more fully elsewhere, just make sure one was created + self.assertEqual(audit_log.user.id, user.id) + self.assertEqual(audit_log.action, AuditAction.AUTH) + + def test_login_with_email_verification(self): + user = AuditLogSignalsTestCase.user + data = { + 'login': 'user', + 'password': 'pass', + } + self.client.post(reverse('kobo_login'), data=data, follow=True) + # no audit log should be created yet because the email has not been verified + self.assertEquals(AuditLog.objects.count(), 0) + # verify the email and try again + email: EmailAddress = EmailAddress.objects.filter(user=user).first() + email.verified = True + email.save() + self.client.post(reverse('kobo_login'), data=data, follow=True) + audit_log = AuditLog.objects.first() + self.assertEqual(audit_log.user.id, user.id) + self.assertEqual(audit_log.action, AuditAction.AUTH) + + def test_mfa_login(self): + mfa_object = get_mfa_model().objects.create( + user=AuditLogSignalsTestCase.user, + secret='dummy_mfa_secret', + name='app', + is_primary=True, + is_active=True, + _backup_codes='dummy_encoded_codes', + ) + mfa_object.save() + email_address, _ = EmailAddress.objects.get_or_create( + user=AuditLogSignalsTestCase.user + ) + email_address.primary = True + email_address.verified = True + email_address.save() + data = { + 'login': 'user', + 'password': 'pass', + } + self.client.post(reverse('kobo_login'), data=data, follow=True) + # no audit log should be created yet because the MFA code hasn't been entered + self.assertEqual(AuditLog.objects.count(), 0) + + with patch( + 'kobo.apps.accounts.mfa.forms.authenticate_second_step_command', + return_value=AuditLogSignalsTestCase.user, + ): + self.client.post( + reverse('mfa_token'), + data={'code': '123456', 'ephemeral_token': 'dummy'}, + follow=True, + ) + self.assertEqual(AuditLog.objects.count(), 1) + audit_log = AuditLog.objects.first() + self.assertEqual(audit_log.user.id, AuditLogSignalsTestCase.user.id) + self.assertEqual(audit_log.action, AuditAction.AUTH) + + def test_loginas(self): + AuditLogSignalsTestCase.user.is_superuser = True + AuditLogSignalsTestCase.user.save() + new_user = User.objects.create_user( + 'user2', 'user2@example.com', 'pass2' + ) + new_user.save() + with skip_login_access_log(): + self.client.login(username='user', password='pass') + self.client.post(reverse('loginas-user-login', args=[new_user.id])) + self.assertEqual(AuditLog.objects.count(), 1) + audit_log = AuditLog.objects.first() + self.assertEqual(audit_log.user.id, new_user.id) + self.assertEqual(audit_log.action, AuditAction.AUTH) diff --git a/kobo/apps/openrosa/apps/viewer/tests/test_viewer_tools.py b/kobo/apps/openrosa/apps/viewer/tests/test_viewer_tools.py index 4e9795584e..eed3c20c27 100644 --- a/kobo/apps/openrosa/apps/viewer/tests/test_viewer_tools.py +++ b/kobo/apps/openrosa/apps/viewer/tests/test_viewer_tools.py @@ -3,13 +3,13 @@ from django.test.client import RequestFactory from kobo.apps.openrosa.apps.main.tests.test_base import TestBase + from kobo.apps.openrosa.libs.utils.viewer_tools import ( export_def_from_filename, get_client_ip, get_human_readable_client_user_agent, ) - @ddt class TestViewerTools(TestBase): def test_export_def_from_filename(self): diff --git a/kpi/constants.py b/kpi/constants.py index faf95ff905..116729dde1 100644 --- a/kpi/constants.py +++ b/kpi/constants.py @@ -138,3 +138,7 @@ ) LIMIT_HOURS_23 = 82800 + +ACCESS_LOG_KOBO_AUTH_APP_LABEL = 'kobo_auth' +ACCESS_LOG_LOGINAS_AUTH_TYPE = 'django-loginas' +ACCESS_LOG_UNKNOWN_AUTH_TYPE = 'Unknown'