Skip to content

Commit

Permalink
feat: create audit logs on login
Browse files Browse the repository at this point in the history
  • Loading branch information
rgraber committed Aug 13, 2024
1 parent abd7820 commit c33ed16
Show file tree
Hide file tree
Showing 8 changed files with 370 additions and 17 deletions.
10 changes: 7 additions & 3 deletions kobo/apps/accounts/tests/test_backend.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import json
from mock import patch

import responses
from allauth.socialaccount.models import SocialAccount, SocialApp
from django.conf import settings
from django.test.utils import override_settings
from django.test import TestCase
from django.test.utils import override_settings
from django.urls import reverse
from mock import patch
from rest_framework import status

from kobo.apps.openrosa.apps.main.models import UserProfile
from kobo.apps.audit_log.models import AuditAction, AuditLog
from kobo.apps.kobo_auth.shortcuts import User
from kobo.apps.openrosa.apps.main.models import UserProfile
from .constants import SOCIALACCOUNT_PROVIDERS


Expand Down Expand Up @@ -98,4 +99,7 @@ def test_keep_django_auth_backend_with_sso(self, mock_verify_and_unstash_state):
self.assertRedirects(response, reverse(settings.LOGIN_REDIRECT_URL))

self.assertTrue(response.wsgi_request.user.is_authenticated)
# Ensure there is a record of the login
audit_log: AuditLog = AuditLog.objects.filter(user=response.wsgi_request.user).first()
self.assertEquals(audit_log.action, AuditAction.AUTH)
assert response.wsgi_request.user.backend == settings.AUTHENTICATION_BACKENDS[0]
63 changes: 62 additions & 1 deletion kobo/apps/audit_log/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
import logging

from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from django.contrib.contenttypes.models import ContentType
from django.db import models
from django.conf import settings
from django.utils.timezone import now

from kobo.apps.kobo_auth.shortcuts import User
from kobo.apps.openrosa.libs.utils.viewer_tools import (
get_client_ip,
get_human_readable_client_user_agent,
)
from kpi.constants import (
ACCESS_LOG_KOBO_AUTH_APP_LABEL,
ACCESS_LOG_LOGINAS_AUTH_TYPE,
ACCESS_LOG_UNKNOWN_AUTH_TYPE,
)
from kpi.fields.kpi_uid import UUID_LENGTH


Expand Down Expand Up @@ -60,3 +73,51 @@ def save(
using=using,
update_fields=update_fields,
)

@staticmethod
def create_access_log_for_request(request, user=None, authentication_type: str = None):
logged_in_user = user or request.user

# django-loginas will keep the superuser as the _cached_user while request.user is set to the new one
# sometimes there won't be a cached user at all, mostly in tests
initial_user = getattr(request, '_cached_user', None)
is_loginas_url = request.resolver_match is not None and request.resolver_match.url_name == 'loginas-user-login'
# a regular login may have an anonymous user as _cached_user, ignore that
user_changed = initial_user and initial_user.is_authenticated and initial_user.id != logged_in_user.id
is_loginas = is_loginas_url and user_changed
if authentication_type and authentication_type != '':
# authentication_type parameter has precedence
auth_type = authentication_type
elif is_loginas:
# second option: loginas
auth_type = ACCESS_LOG_LOGINAS_AUTH_TYPE
elif hasattr(logged_in_user, 'backend') and logged_in_user.backend is not None:
# third option: the backend that authenticated the user
auth_type = logged_in_user.backend
else:
# default: unknown
auth_type = ACCESS_LOG_UNKNOWN_AUTH_TYPE

# gather information about the source of the request
ip = get_client_ip(request)
source = get_human_readable_client_user_agent(request)
metadata = {
'ip_address': ip,
'source': source,
'auth_type': auth_type,
}

# add extra information if needed for django-loginas
if is_loginas:
metadata['initial_user_uid'] = initial_user.extra_details.uid
metadata['initial_user_username'] = initial_user.username
audit_log = AuditLog(
user=logged_in_user,
app_label=ACCESS_LOG_KOBO_AUTH_APP_LABEL,
model_name=User.__qualname__,
object_id=logged_in_user.id,
user_uid=logged_in_user.extra_details.uid,
action=AuditAction.AUTH,
metadata=metadata,
)
return audit_log
17 changes: 17 additions & 0 deletions kobo/apps/audit_log/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from django.contrib.auth.signals import user_logged_in
from django.dispatch import receiver

from kpi.utils.log import logging
from .models import AuditLog


@receiver(user_logged_in)
def create_access_log(sender, user, **kwargs):
request = kwargs['request']
if not hasattr(request, 'user'):
# This should never happen outside of tests
logging.warning('Request does not have authenticated user attached.')
log = AuditLog.create_access_log_for_request(request, user)
else:
log = AuditLog.create_access_log_for_request(request)
log.save()
29 changes: 17 additions & 12 deletions kobo/apps/audit_log/tests/api/v2/test_api_audit_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from rest_framework.reverse import reverse

from kobo.apps.audit_log.models import AuditAction, AuditLog
from kobo.apps.audit_log.tests.test_signals import skip_login_access_log
from kpi.tests.base_test_case import BaseTestCase
from kpi.urls.router_api_v2 import URL_NAMESPACE as ROUTER_URL_NAMESPACE

Expand Down Expand Up @@ -48,17 +49,20 @@ def test_list_as_superuser(self):
date_created=date_created,
action=AuditAction.DELETE
)
self.client.login(username='admin', password='pass')
expected = [{
'app_label': 'foo',
'model_name': 'bar',
'object_id': 1,
'user': 'http://testserver/api/v2/users/someuser/',
'user_uid': someuser.extra_details.uid,
'action': 'DELETE',
'metadata': {},
'date_created': date_created,
}]
with skip_login_access_log():
self.client.login(username='admin', password='pass')
expected = [
{
'app_label': 'foo',
'model_name': 'bar',
'object_id': 1,
'user': 'http://testserver/api/v2/users/someuser/',
'user_uid': someuser.extra_details.uid,
'action': 'DELETE',
'metadata': {},
'date_created': date_created,
},
]
response = self.client.get(self.audit_log_list_url)
audit_logs_count = AuditLog.objects.count()
assert response.status_code == status.HTTP_200_OK
Expand All @@ -85,7 +89,8 @@ def test_filter_list(self):
date_created=date_created,
action=AuditAction.DELETE,
)
self.client.login(username='admin', password='pass')
with skip_login_access_log():
self.client.login(username='admin', password='pass')
expected = [{
'app_label': 'foo',
'model_name': 'bar',
Expand Down
121 changes: 121 additions & 0 deletions kobo/apps/audit_log/tests/test_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from unittest.mock import patch

from django.contrib.auth.models import AnonymousUser
from django.test.client import RequestFactory
from django.urls import resolve, reverse

from kobo.apps.audit_log.models import (
ACCESS_LOG_KOBO_AUTH_APP_LABEL,
ACCESS_LOG_LOGINAS_AUTH_TYPE,
ACCESS_LOG_UNKNOWN_AUTH_TYPE,
AuditAction,
AuditLog,
)
from kobo.apps.kobo_auth.shortcuts import User
from kpi.tests.base_test_case import BaseTestCase


@patch('kobo.apps.audit_log.models.get_human_readable_client_user_agent', return_value='source')
@patch('kobo.apps.audit_log.models.get_client_ip', return_value='127.0.0.1')
class AuditLogModelTestCase(BaseTestCase):

@classmethod
def setUpClass(cls):
super().setUpClass()
cls.super_user = User.objects.create_user(
'user', '[email protected]', 'userpass'
)
cls.super_user.is_super = True
cls.super_user.backend = 'django.contrib.auth.backends.ModelBackend'
cls.super_user.save()

def _create_request(self, url: str, cached_user, new_user):
factory = RequestFactory()
request = factory.post(url)
request.user = new_user
request._cached_user = cached_user
request.resolver_match = resolve(url)
return request

def _check_common_fields(self, audit_log: AuditLog, user):
self.assertEqual(audit_log.user.id, user.id)
self.assertEqual(audit_log.app_label, ACCESS_LOG_KOBO_AUTH_APP_LABEL)
self.assertEqual(audit_log.model_name, 'User')
self.assertEqual(audit_log.object_id, user.id)
self.assertEqual(audit_log.user_uid, user.extra_details.uid)
self.assertEqual(audit_log.action, AuditAction.AUTH)

def test_basic_create_auth_log_from_request(self, patched_ip, patched_source):
request = self._create_request(
reverse('kobo_login'), AnonymousUser(), AuditLogModelTestCase.super_user
)
log: AuditLog = AuditLog.create_access_log_for_request(request)
self._check_common_fields(log, AuditLogModelTestCase.super_user)
self.assertDictEqual(
log.metadata,
{
'ip_address': '127.0.0.1',
'source': 'source',
'auth_type': AuditLogModelTestCase.super_user.backend,
},
)

def test_create_auth_log_from_loginas_request(self, patched_ip, patched_source):
second_user = User.objects.create_user(
'second_user', '[email protected]', 'pass'
)
second_user.save()
request = self._create_request(
reverse('loginas-user-login', args=(second_user.id,)),
AuditLogModelTestCase.super_user,
second_user,
)
log: AuditLog = AuditLog.create_access_log_for_request(request)
self._check_common_fields(log, second_user)
self.assertDictEqual(
log.metadata,
{
'ip_address': '127.0.0.1',
'source': 'source',
'auth_type': ACCESS_LOG_LOGINAS_AUTH_TYPE,
'initial_user_uid': AuditLogModelTestCase.super_user.extra_details.uid,
'initial_user_username': AuditLogModelTestCase.super_user.username,
},
)

def test_create_auth_log_with_different_auth_type(self, patched_ip, patched_source):
request = self._create_request(
reverse('api_v2:asset-list'), AnonymousUser(), AuditLogModelTestCase.super_user
)
log: AuditLog = AuditLog.create_access_log_for_request(
request, authentication_type='Token'
)
self._check_common_fields(log, AuditLogModelTestCase.super_user)
self.assertDictEqual(
log.metadata,
{
'ip_address': '127.0.0.1',
'source': 'source',
'auth_type': 'Token',
},
)

def test_create_auth_log_unknown_authenticator(self, patched_ip, patched_source):
# no backend attached to the user object
second_user = User.objects.create_user(
'second_user', '[email protected]', 'pass'
)
second_user.save()
request = self._create_request(
reverse('api_v2:asset-list'), AuditLogModelTestCase.super_user, second_user
)
log: AuditLog = AuditLog.create_access_log_for_request(request)
self._check_common_fields(log, second_user)
self.assertDictEqual(
log.metadata,
{
'ip_address': '127.0.0.1',
'source': 'source',
'auth_type': ACCESS_LOG_UNKNOWN_AUTH_TYPE,
},
)
Loading

0 comments on commit c33ed16

Please sign in to comment.