diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 0804b100..51d6cc57 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,18 @@ Change Log Unreleased ********** +0.23.0 - 2026-02-18 +******************** + +Added +===== + +* Add authz_migrate_course_authoring command to migrate legacy CourseAccessRole data to the new Authz (Casbin-based) system +* Add authz_rollback_course_authoring command to rollback Authz roles back to legacy CourseAccessRole +* Support optional --delete flag for controlled cleanup of source permissions after successful migration +* Add migrate_legacy_course_roles_to_authz and migrate_authz_to_legacy_course_roles service functions +* Add unit tests to verify migration and command behavior + Added ===== diff --git a/openedx_authz/__init__.py b/openedx_authz/__init__.py index de3fced5..86dbaad4 100644 --- a/openedx_authz/__init__.py +++ b/openedx_authz/__init__.py @@ -4,6 +4,6 @@ import os -__version__ = "0.22.0" +__version__ = "0.23.0" ROOT_DIRECTORY = os.path.dirname(os.path.abspath(__file__)) diff --git a/openedx_authz/admin.py b/openedx_authz/admin.py index e2285158..52470867 100644 --- a/openedx_authz/admin.py +++ b/openedx_authz/admin.py @@ -5,6 +5,8 @@ from django.contrib import admin from openedx_authz.models import ExtendedCasbinRule +from openedx_authz.models.scopes import CourseScope +from openedx_authz.models.subjects import UserSubject class CasbinRuleForm(forms.ModelForm): @@ -48,3 +50,18 @@ class CasbinRuleAdmin(admin.ModelAdmin): # TODO: In a future, possibly we should only show an inline for the rules that # have an extended rule, and show the subject and scope information in detail. inlines = [ExtendedCasbinRuleInline] + + +@admin.register(ExtendedCasbinRule) +class ExtendedCasbinRuleAdmin(admin.ModelAdmin): + pass + + +@admin.register(UserSubject) +class UserSubjectAdmin(admin.ModelAdmin): + pass + + +@admin.register(CourseScope) +class CourseScopeAdmin(admin.ModelAdmin): + list_display = ("id", "course_overview") diff --git a/openedx_authz/engine/utils.py b/openedx_authz/engine/utils.py index 5d15fd65..133a9859 100644 --- a/openedx_authz/engine/utils.py +++ b/openedx_authz/engine/utils.py @@ -8,14 +8,33 @@ from casbin import Enforcer -from openedx_authz.api.users import assign_role_to_user_in_scope, batch_assign_role_to_users_in_scope -from openedx_authz.constants.roles import LIBRARY_ADMIN, LIBRARY_AUTHOR, LIBRARY_USER +from openedx_authz.api.data import CourseOverviewData +from openedx_authz.api.users import ( + assign_role_to_user_in_scope, + batch_assign_role_to_users_in_scope, + batch_unassign_role_from_users, + get_user_role_assignments, +) +from openedx_authz.constants.roles import ( + COURSE_ADMIN, + COURSE_DATA_RESEARCHER, + COURSE_LIMITED_STAFF, + COURSE_STAFF, + LEGACY_COURSE_ROLE_EQUIVALENCES, + LIBRARY_ADMIN, + LIBRARY_AUTHOR, + LIBRARY_USER, +) logger = logging.getLogger(__name__) GROUPING_POLICY_PTYPES = ["g", "g2", "g3", "g4", "g5", "g6"] +# Map new roles back to legacy roles for rollback purposes +COURSE_ROLE_EQUIVALENCES = {v: k for k, v in LEGACY_COURSE_ROLE_EQUIVALENCES.items()} + + def migrate_policy_between_enforcers( source_enforcer: Enforcer, target_enforcer: Enforcer, @@ -151,3 +170,148 @@ def migrate_legacy_permissions(ContentLibraryPermission): ) return permissions_with_errors + + +def migrate_legacy_course_roles_to_authz(CourseAccessRole, delete_after_migration): + """ + Migrate legacy course role data to the new Casbin-based authorization model. + This function reads legacy permissions from the CourseAccessRole model + and assigns equivalent roles in the new authorization system. + + The old Course permissions are stored in the CourseAccessRole model, it consists of the following columns: + + - user: FK to User + - org: optional Organization string + - course_id: optional CourseKeyField of Course + - role: 'instructor' | 'staff' | 'limited_staff' | 'data_researcher' + + In the new Authz model, this would roughly translate to: + + - course_id: scope + - user: subject + - role: role + + param CourseAccessRole: The CourseAccessRole model to use. + """ + + legacy_permissions = ( + CourseAccessRole.objects.filter(course_id__startswith="course-v1:").select_related("user").all() + ) + + # List to keep track of any permissions that could not be migrated + permissions_with_errors = [] + permissions_with_no_errors = [] + + for permission in legacy_permissions: + # Migrate the permission to the new model + + # Derive equivalent role based on access level + map_legacy_role = { + "instructor": COURSE_ADMIN, + "staff": COURSE_STAFF, + "limited_staff": COURSE_LIMITED_STAFF, + "data_researcher": COURSE_DATA_RESEARCHER, + } + + role = map_legacy_role.get(permission.role) + if role is None: + # This should not happen as there are no more access_levels defined + # in CourseAccessRole, log and skip + logger.error(f"Unknown access level: {permission.role} for User: {permission.user}") + permissions_with_errors.append(permission) + continue + + # Permission applied to individual user + logger.info( + f"Migrating permission for User: {permission.user.username} " + f"to Role: {role.external_key} in Scope: {permission.course_id}" + ) + + is_user_added = assign_role_to_user_in_scope( + user_external_key=permission.user.username, + role_external_key=role.external_key, + scope_external_key=str(permission.course_id), + ) + + if not is_user_added: + logger.error( + f"Failed to migrate permission for User: {permission.user.username} " + f"to Role: {role.external_key} in Scope: {permission.course_id}" + ) + permissions_with_errors.append(permission) + continue + + permissions_with_no_errors.append(permission) + + if delete_after_migration: + CourseAccessRole.objects.filter(id__in=[p.id for p in permissions_with_no_errors]).delete() + + return permissions_with_errors + + +def migrate_authz_to_legacy_course_roles(CourseAccessRole, UserSubject, delete_after_migration): + """ + Migrate permissions from the new Casbin-based authorization model back to the legacy CourseAccessRole model. + This function reads permissions from the Casbin enforcer and creates equivalent entries in the + CourseAccessRole model. + + This is essentially the reverse of migrate_legacy_course_roles_to_authz and is intended + for rollback purposes in case of migration issues. + """ + # 1. Get all users with course-related permissions in the new model by filtering + # UserSubjects that are linked to CourseScopes with a valid course overview. + course_subjects = ( + UserSubject.objects.filter(casbin_rules__scope__coursescope__course_overview__isnull=False) + .select_related("user") + .distinct() + ) + + roles_with_errors = [] + + for course_subject in course_subjects: + user = course_subject.user + user_external_key = user.username + + # 2. Get all role assignments for the user + role_assignments = get_user_role_assignments(user_external_key=user_external_key) + + for assignment in role_assignments: + if not isinstance(assignment.scope, CourseOverviewData): + logger.error(f"Skipping role assignment for User: {user_external_key} due to missing course scope.") + continue + + scope = assignment.scope.external_key + + course_overview = assignment.scope.get_object() + + for role in assignment.roles: + legacy_role = COURSE_ROLE_EQUIVALENCES.get(role.external_key) + if legacy_role is None: + logger.error(f"Unknown role: {role} for User: {user_external_key}") + roles_with_errors.append((user_external_key, role.external_key, scope)) + continue + + try: + # Create legacy CourseAccessRole entry + CourseAccessRole.objects.get_or_create( + user=user, + org=course_overview.org, + course_id=scope, + role=legacy_role, + ) + except Exception as e: # pylint: disable=broad-exception-caught + logger.error( + f"Error creating CourseAccessRole for User: " + f"{user_external_key}, Role: {legacy_role}, Course: {scope}: {e}" + ) + roles_with_errors.append((user_external_key, role.external_key, scope)) + continue + + # If we successfully created the legacy role, we can unassign the new role + if delete_after_migration: + batch_unassign_role_from_users( + users=[user_external_key], + role_external_key=role.external_key, + scope_external_key=scope, + ) + return roles_with_errors diff --git a/openedx_authz/management/commands/authz_migrate_course_authoring.py b/openedx_authz/management/commands/authz_migrate_course_authoring.py new file mode 100644 index 00000000..1deed9b4 --- /dev/null +++ b/openedx_authz/management/commands/authz_migrate_course_authoring.py @@ -0,0 +1,63 @@ +""" +Django management command to migrate legacy course authoring roles to the new Authz (Casbin-based) authorization system. +""" + +from django.core.management.base import BaseCommand +from django.db import transaction + +from openedx_authz.engine.utils import migrate_legacy_course_roles_to_authz + +try: + from common.djangoapps.student.models import CourseAccessRole +except ImportError: + CourseAccessRole = None # type: ignore + + +class Command(BaseCommand): + """ + Django command to migrate legacy CourseAccessRole data + to the new Authz (Casbin-based) authorization system. + """ + + help = "Migrate legacy course authoring roles to the new Authz system." + + def add_arguments(self, parser): + parser.add_argument( + "--delete", + action="store_true", + help="Delete legacy CourseAccessRole records after successful migration.", + ) + + def handle(self, *args, **options): + delete_after_migration = options["delete"] + + self.stdout.write(self.style.WARNING("Starting legacy → Authz migration...")) + + try: + if delete_after_migration: + confirm = input( + "Are you sure you want to delete successfully migrated legacy roles? Type 'yes' to continue: " + ) + + if confirm != "yes": + self.stdout.write(self.style.WARNING("Deletion aborted.")) + return + with transaction.atomic(): + errors = migrate_legacy_course_roles_to_authz( + CourseAccessRole=CourseAccessRole, + delete_after_migration=delete_after_migration, + ) + + if errors: + self.stdout.write(self.style.ERROR(f"Migration completed with {len(errors)} errors.")) + else: + self.stdout.write(self.style.SUCCESS("Migration completed successfully with no errors.")) + + if delete_after_migration: + self.stdout.write(self.style.SUCCESS("Legacy roles deleted successfully.")) + + except Exception as exc: + self.stdout.write(self.style.ERROR(f"Migration failed due to unexpected error: {exc}")) + raise + + self.stdout.write(self.style.SUCCESS("Done.")) diff --git a/openedx_authz/management/commands/authz_rollback_course_authoring.py b/openedx_authz/management/commands/authz_rollback_course_authoring.py new file mode 100644 index 00000000..0b4334a8 --- /dev/null +++ b/openedx_authz/management/commands/authz_rollback_course_authoring.py @@ -0,0 +1,67 @@ +""" +Django management command to rollback course authoring roles from the new Authz (Casbin-based) +authorization system back to the legacy CourseAccessRole model. +""" + +from django.core.management.base import BaseCommand +from django.db import transaction + +from openedx_authz.engine.utils import migrate_authz_to_legacy_course_roles +from openedx_authz.models.subjects import UserSubject + +try: + from common.djangoapps.student.models import CourseAccessRole +except ImportError: + CourseAccessRole = None # type: ignore + + +class Command(BaseCommand): + """ + Django command to rollback course authoring roles + from the new Authz system back to legacy CourseAccessRole. + """ + + help = "Rollback Authz course authoring roles to legacy CourseAccessRole." + + def add_arguments(self, parser): + parser.add_argument( + "--delete", + action="store_true", + help="Delete Authz role assignments after successful rollback.", + ) + + def handle(self, *args, **options): + delete_after_migration = options["delete"] + + self.stdout.write(self.style.WARNING("Starting Authz → Legacy rollback migration...")) + + try: + if delete_after_migration: + confirm = input( + "Are you sure you want to remove the new Authz role " + "assignments after rollback? Type 'yes' to continue: " + ) + + if confirm != "yes": + self.stdout.write(self.style.WARNING("Deletion aborted.")) + return + with transaction.atomic(): + errors = migrate_authz_to_legacy_course_roles( + CourseAccessRole=CourseAccessRole, + UserSubject=UserSubject, + delete_after_migration=delete_after_migration, # control deletion here + ) + + if errors: + self.stdout.write(self.style.ERROR(f"Rollback completed with {len(errors)} errors.")) + else: + self.stdout.write(self.style.SUCCESS("Rollback completed successfully with no errors.")) + + if delete_after_migration: + self.stdout.write(self.style.SUCCESS("Authz role assignments removed successfully.")) + + except Exception as exc: + self.stdout.write(self.style.ERROR(f"Rollback failed due to unexpected error: {exc}")) + raise + + self.stdout.write(self.style.SUCCESS("Done.")) diff --git a/openedx_authz/tests/stubs/models.py b/openedx_authz/tests/stubs/models.py index c64995aa..86c39079 100644 --- a/openedx_authz/tests/stubs/models.py +++ b/openedx_authz/tests/stubs/models.py @@ -125,3 +125,57 @@ def get_from_id(cls, course_key): key = str(course_key) obj, _ = cls.objects.get_or_create(id=key) return obj + + +class NoneToEmptyQuerySet(models.query.QuerySet): + """ + A :class:`django.db.query.QuerySet` that replaces `None` values passed to `filter` and `exclude` + with the corresponding `Empty` value for all fields with an `Empty` attribute. + + This is to work around Django automatically converting `exact` queries for `None` into + `isnull` queries before the field has a chance to convert them to queries for it's own + empty value. + """ + + def _filter_or_exclude(self, *args, **kwargs): + for field_object in self.model._meta.get_fields(): + direct = not field_object.auto_created or field_object.concrete + if direct and hasattr(field_object, "Empty"): + for suffix in ("", "_exact"): + key = f"{field_object.name}{suffix}" + if key in kwargs and kwargs[key] is None: + kwargs[key] = field_object.Empty + + return super()._filter_or_exclude(*args, **kwargs) + + +class NoneToEmptyManager(models.Manager): + """ + A :class:`django.db.models.Manager` that has a :class:`NoneToEmptyQuerySet` + as its `QuerySet`, initialized with a set of specified `field_names`. + """ + + def get_queryset(self): + """ + Returns the result of NoneToEmptyQuerySet instead of a regular QuerySet. + """ + return NoneToEmptyQuerySet(self.model, using=self._db) + + +class CourseAccessRole(models.Model): + """ + Maps users to org, courses, and roles. Used by student.roles.CourseRole and OrgRole. + To establish a user as having a specific role over all courses in the org, create an entry + without a course_id. + + .. no_pii: + """ + + objects = NoneToEmptyManager() + + user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE) + # blank org is for global group based roles such as course creator (may be deprecated) + org = models.CharField(max_length=64, db_index=True, blank=True) + # blank course_id implies org wide role + course_id = CourseKeyField(max_length=255, db_index=True, blank=True) + role = models.CharField(max_length=64, db_index=True) diff --git a/openedx_authz/tests/test_migrations.py b/openedx_authz/tests/test_migrations.py index 5d612d01..0a9d2c55 100644 --- a/openedx_authz/tests/test_migrations.py +++ b/openedx_authz/tests/test_migrations.py @@ -1,14 +1,36 @@ """Unit Tests for openedx_authz migrations.""" +from unittest.mock import patch + from django.contrib.auth import get_user_model from django.contrib.auth.models import Group +from django.core.management import call_command from django.test import TestCase from openedx_authz.api.users import batch_unassign_role_from_users, get_user_role_assignments_in_scope -from openedx_authz.constants.roles import LIBRARY_ADMIN, LIBRARY_USER +from openedx_authz.constants.roles import ( + COURSE_ADMIN, + COURSE_DATA_RESEARCHER, + COURSE_LIMITED_STAFF, + COURSE_STAFF, + LEGACY_COURSE_ROLE_EQUIVALENCES, + LIBRARY_ADMIN, + LIBRARY_USER, +) from openedx_authz.engine.enforcer import AuthzEnforcer -from openedx_authz.engine.utils import migrate_legacy_permissions -from openedx_authz.tests.stubs.models import ContentLibrary, ContentLibraryPermission, Organization +from openedx_authz.engine.utils import ( + migrate_authz_to_legacy_course_roles, + migrate_legacy_course_roles_to_authz, + migrate_legacy_permissions, +) +from openedx_authz.models.subjects import UserSubject +from openedx_authz.tests.stubs.models import ( + ContentLibrary, + ContentLibraryPermission, + CourseAccessRole, + CourseOverview, + Organization, +) User = get_user_model() @@ -26,8 +48,8 @@ empty_group_name = f"{OBJECT_PREFIX}empty_group" -class TestLegacyPermissionsMigration(TestCase): - """Test cases for migrating legacy permissions.""" +class TestLegacyContentLibraryPermissionsMigration(TestCase): + """Test cases for migrating legacy content library permissions.""" def setUp(self): """ @@ -146,3 +168,587 @@ def test_migration(self): self.assertEqual(assignments[0].roles[0], LIBRARY_USER) self.assertEqual(len(permissions_with_errors), 2) + + +class TestLegacyCourseAuthoringPermissionsMigration(TestCase): + """Test cases for migrating legacy course authoring permissions.""" + + def setUp(self): + """ + Set up test data: + + What this does: + 1. Defines an Org and a CourseKey for the test course + 2. Create Users for each legacy role and an additional user for testing invalid permissions + 3. Assign legacy permissions using CourseAccessRole for each user and role combination + 4. Create invalid permissions for user to test error logging + - Notes: + - CourseAccessRole does not have a group concept, so we are only assigning + permissions to individual users in this test. + - The only roles we are migrating are instructor, staff, limited_staff and data_researcher, + any other role in CourseAccessRole will be considered invalid for the purpose of this test. + """ + + # Defining course identifiers + self.org = org_short_name + self.course_id = f"course-v1:{self.org}+{OBJECT_PREFIX}course+2024" + default_course_fields = { + "org": self.org, + "course_id": self.course_id, + } + self.course_overview = CourseOverview.objects.create( + id=self.course_id, org=self.org, display_name=f"{OBJECT_PREFIX} Course" + ) + + # Create lists to hold legacy role objects for cleanup and verification purposes + self.admin_legacy_roles = [] + self.staff_legacy_roles = [] + self.limited_staff_legacy_roles = [] + self.data_researcher_legacy_roles = [] + + # Create users for each legacy role and an additional user for testing invalid permissions + self.admin_users = [ + User.objects.create_user(username=f"admin_{user_name}", email=f"admin_{user_name}@example.com") + for user_name in user_names + ] + + self.staff_users = [ + User.objects.create_user(username=f"staff_{user_name}", email=f"staff_{user_name}@example.com") + for user_name in user_names + ] + + self.limited_staff = [ + User.objects.create_user( + username=f"limited_staff_{user_name}", email=f"limited_staff_{user_name}@example.com" + ) + for user_name in user_names + ] + + self.data_researcher = [ + User.objects.create_user( + username=f"data_researcher_{user_name}", email=f"data_researcher_{user_name}@example.com" + ) + for user_name in user_names + ] + + self.error_user = User.objects.create_user(username=error_user_name, email=f"{error_user_name}@example.com") + + # Assign legacy permissions for users based on their role + for user in self.admin_users: + leg_role = CourseAccessRole.objects.create( + **default_course_fields, + user=user, + role="instructor", + ) + self.admin_legacy_roles.append(leg_role) + + for user in self.staff_users: + leg_role = CourseAccessRole.objects.create( + **default_course_fields, + user=user, + role="staff", + ) + self.staff_legacy_roles.append(leg_role) + + for user in self.limited_staff: + leg_role = CourseAccessRole.objects.create( + **default_course_fields, + user=user, + role="limited_staff", + ) + self.limited_staff_legacy_roles.append(leg_role) + + for user in self.data_researcher: + leg_role = CourseAccessRole.objects.create( + **default_course_fields, + user=user, + role="data_researcher", + ) + self.data_researcher_legacy_roles.append(leg_role) + + # Create invalid permission for testing error logging + CourseAccessRole.objects.create( + **default_course_fields, + user=self.error_user, + role="invalid-legacy-role", + ) + + def tearDown(self): + """ + Clean up test data created for the migration test. + """ + super().tearDown() + AuthzEnforcer.get_enforcer().load_policy() + + admin_users_names = [user.username for user in self.admin_users] + staff_users_names = [user.username for user in self.staff_users] + limited_staff_users_names = [user.username for user in self.limited_staff] + data_researcher_users_names = [user.username for user in self.data_researcher] + + batch_unassign_role_from_users( + users=admin_users_names, + role_external_key=COURSE_ADMIN.external_key, + scope_external_key=self.course_id, + ) + batch_unassign_role_from_users( + users=staff_users_names, + role_external_key=COURSE_STAFF.external_key, + scope_external_key=self.course_id, + ) + batch_unassign_role_from_users( + users=limited_staff_users_names, + role_external_key=COURSE_LIMITED_STAFF.external_key, + scope_external_key=self.course_id, + ) + batch_unassign_role_from_users( + users=data_researcher_users_names, + role_external_key=COURSE_DATA_RESEARCHER.external_key, + scope_external_key=self.course_id, + ) + + def test_legacy_course_role_equivalences_mapping(self): + """Test that the LEGACY_COURSE_ROLE_EQUIVALENCES mapping contains no duplicate values.""" + legacy_roles = LEGACY_COURSE_ROLE_EQUIVALENCES.keys() + new_roles = LEGACY_COURSE_ROLE_EQUIVALENCES.values() + + # Check that there are no duplicate values in the mapping + self.assertEqual( + len(legacy_roles), len(set(new_roles)), "LEGACY_COURSE_ROLE_EQUIVALENCES contains duplicate values" + ) + + @patch("openedx_authz.api.data.CourseOverview", CourseOverview) + def test_migrate_legacy_course_roles_to_authz_and_rollback_no_deletion(self): + """Test the migration of legacy permissions from CourseAccessRole to the new Casbin-based model + and the rollback functionality without deletion. + + 1. Run the migration to migrate legacy permissions from CourseAccessRole to the + new model with delete_after_migration set to False. + - Notes: + - The migration function should correctly map legacy roles to + the new roles based on the defined mapping in the migration function. + - Any legacy role that does not have a defined mapping should be logged as an error + and not migrated. + - After migration, all legacy CourseAccessRole entries should not be deleted + since we set delete_after_migration to False. + 2. Check that each user has the expected role in the new model. + 3. Check that invalid permissions were identified correctly as errors. + 4. Rollback the migration and check that each user has the expected legacy role and + that all migrated permissions were rolled back successfully. + """ + + # Capture the old state of permissions for rollback testing + original_state_access_roles = list( + CourseAccessRole.objects.all().order_by("id").values("id", "user_id", "org", "course_id", "role") + ) + self.assertEqual( + len(user_names), 3 + ) # Sanity check to ensure we have the expected number of users for each role + self.assertEqual( + len(original_state_access_roles), 13 + ) # 3 users for each of the 4 roles + 1 invalid role = 13 total entries + + # Migrate from legacy CourseAccessRole to new Casbin-based model + permissions_with_errors = migrate_legacy_course_roles_to_authz(CourseAccessRole, delete_after_migration=False) + AuthzEnforcer.get_enforcer().load_policy() + for user in self.admin_users: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + self.assertEqual(len(assignments), 1) + self.assertEqual(assignments[0].roles[0], COURSE_ADMIN) + for user in self.staff_users: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + self.assertEqual(len(assignments), 1) + self.assertEqual(assignments[0].roles[0], COURSE_STAFF) + for user in self.limited_staff: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + self.assertEqual(len(assignments), 1) + self.assertEqual(assignments[0].roles[0], COURSE_LIMITED_STAFF) + for user in self.data_researcher: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + self.assertEqual(len(assignments), 1) + self.assertEqual(assignments[0].roles[0], COURSE_DATA_RESEARCHER) + self.assertEqual(len(permissions_with_errors), 1) + self.assertEqual(permissions_with_errors[0].user.username, self.error_user.username) + self.assertEqual(permissions_with_errors[0].role, "invalid-legacy-role") + + after_migrate_state_access_roles = list( + CourseAccessRole.objects.all().order_by("id").values("id", "user_id", "org", "course_id", "role") + ) + + # 3 users for each of the 4 roles + 1 invalid role = 13 total entries + self.assertEqual(len(after_migrate_state_access_roles), 13) + # Must be the same before and after migration since we set delete_after_migration to False + self.assertEqual(original_state_access_roles, after_migrate_state_access_roles) + + # Now let's rollback + + # Capture the state of permissions before rollback to verify that rollback restores the original state + original_state_user_subjects = list( + UserSubject.objects.filter(casbin_rules__scope__coursescope__course_overview__isnull=False) + .distinct() + .order_by("id") + .values("id", "user_id") + ) + original_state_access_roles = list( + CourseAccessRole.objects.all().order_by("id").values("id", "user_id", "org", "course_id", "role") + ) + + permissions_with_errors = migrate_authz_to_legacy_course_roles( + CourseAccessRole, UserSubject, delete_after_migration=False + ) + + # Check that each user has the expected legacy role after rollback and that errors + # are logged for any permissions that could not be rolled back + for user in self.admin_users: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + self.assertEqual(len(assignments), 1) + self.assertEqual(assignments[0].roles[0], COURSE_ADMIN) + for user in self.staff_users: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + self.assertEqual(len(assignments), 1) + self.assertEqual(assignments[0].roles[0], COURSE_STAFF) + for user in self.limited_staff: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + self.assertEqual(len(assignments), 1) + self.assertEqual(assignments[0].roles[0], COURSE_LIMITED_STAFF) + for user in self.data_researcher: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + self.assertEqual(len(assignments), 1) + self.assertEqual(assignments[0].roles[0], COURSE_DATA_RESEARCHER) + self.assertEqual(len(permissions_with_errors), 0) + + state_after_migration_user_subjects = list( + UserSubject.objects.filter(casbin_rules__scope__coursescope__course_overview__isnull=False) + .distinct() + .order_by("id") + .values("id", "user_id") + ) + after_migrate_state_access_roles = list( + CourseAccessRole.objects.all().order_by("id").values("id", "user_id", "org", "course_id", "role") + ) + + # The number of CourseAccessRole entries should be the same as the original state + # since we are not deleting any entries in this test. + self.assertEqual(len(original_state_access_roles), 13) + + # All original entries should still be there since we are not deleting any entries + # and when creating new entries for the users that were migrated back to legacy roles, + # we are creating them with get_or_create which will not create duplicates if an entry + # with the same user, org, course_id and role already exists. + self.assertEqual(len(after_migrate_state_access_roles), 13) + + # Sanity check to ensure we have the expected number of UserSubjects related to + # the course permissions before migration (3 users * 4 roles = 12) + self.assertEqual(len(original_state_user_subjects), 12) + + # After rollback, we should have the same 12 UserSubjects related to the course permissions + # since we are not deleting any entries in this test, + self.assertEqual(len(state_after_migration_user_subjects), 12) + + @patch("openedx_authz.api.data.CourseOverview", CourseOverview) + def test_migrate_legacy_course_roles_to_authz_and_rollback_with_deletion(self): + """Test the migration of legacy permissions from CourseAccessRole to + the new Casbin-based model with deletion of legacy permissions after migration. + + 1. Run the migration to migrate legacy permissions from CourseAccessRole to the + new model with delete_after_migration set to True. + - Notes: + - The migration function should correctly map legacy roles to + the new roles based on the defined mapping in the migration function. + - Any legacy role that does not have a defined mapping should be logged as an error + and not migrated. + - After migration, all legacy CourseAccessRole entries should be deleted + since we set delete_after_migration to True. + 2. Check that each user has the expected role in the new model. + 3. Check that invalid permissions were identified correctly as errors. + 4. Rollback the migration and check that each user has the expected legacy role and + that all migrated permissions were rolled back successfully. + """ + + # Capture the old state of permissions for rollback testing + original_state_access_roles = list( + CourseAccessRole.objects.all().order_by("id").values("id", "user_id", "org", "course_id", "role") + ) + self.assertEqual( + len(user_names), 3 + ) # Sanity check to ensure we have the expected number of users for each role + self.assertEqual( + len(original_state_access_roles), 13 + ) # 3 users for each of the 4 roles + 1 invalid role = 13 total entries + + # Migrate from legacy CourseAccessRole to new Casbin-based model + permissions_with_errors = migrate_legacy_course_roles_to_authz(CourseAccessRole, delete_after_migration=True) + AuthzEnforcer.get_enforcer().load_policy() + for user in self.admin_users: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + self.assertEqual(len(assignments), 1) + self.assertEqual(assignments[0].roles[0], COURSE_ADMIN) + for user in self.staff_users: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + self.assertEqual(len(assignments), 1) + self.assertEqual(assignments[0].roles[0], COURSE_STAFF) + for user in self.limited_staff: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + self.assertEqual(len(assignments), 1) + self.assertEqual(assignments[0].roles[0], COURSE_LIMITED_STAFF) + for user in self.data_researcher: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + self.assertEqual(len(assignments), 1) + self.assertEqual(assignments[0].roles[0], COURSE_DATA_RESEARCHER) + self.assertEqual(len(permissions_with_errors), 1) + self.assertEqual(permissions_with_errors[0].user.username, self.error_user.username) + self.assertEqual(permissions_with_errors[0].role, "invalid-legacy-role") + + after_migrate_state_access_roles = list( + CourseAccessRole.objects.all().order_by("id").values("id", "user_id", "org", "course_id", "role") + ) + + self.assertEqual(len(original_state_access_roles), 13) + + # Only the invalid role entry should remain since we set delete_after_migration to True + self.assertEqual(len(after_migrate_state_access_roles), 1) + + # Must be different before and after migration since we set delete_after_migration + # to True and we are deleting all + self.assertNotEqual(original_state_access_roles, after_migrate_state_access_roles) + + # Now let's rollback + + # Capture the state of permissions before rollback to verify that rollback restores the original state + original_state_user_subjects = list( + UserSubject.objects.filter(casbin_rules__scope__coursescope__course_overview__isnull=False) + .distinct() + .order_by("id") + .values("id", "user_id") + ) + original_state_access_roles = list( + CourseAccessRole.objects.all().order_by("id").values("id", "user_id", "org", "course_id", "role") + ) + + permissions_with_errors = migrate_authz_to_legacy_course_roles( + CourseAccessRole, UserSubject, delete_after_migration=True + ) + + # Check that each user has the expected legacy role after rollback + # and that errors are logged for any permissions that could not be rolled back + for user in self.admin_users: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + self.assertEqual(len(assignments), 0) + for user in self.staff_users: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + self.assertEqual(len(assignments), 0) + for user in self.limited_staff: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + self.assertEqual(len(assignments), 0) + for user in self.data_researcher: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + self.assertEqual(len(assignments), 0) + self.assertEqual(len(permissions_with_errors), 0) + + state_after_migration_user_subjects = list( + UserSubject.objects.filter(casbin_rules__scope__coursescope__course_overview__isnull=False) + .distinct() + .order_by("id") + .values("id", "user_id") + ) + after_migrate_state_access_roles = list( + CourseAccessRole.objects.all().order_by("id").values("id", "user_id", "org", "course_id", "role") + ) + + # Before the rollback, we should only have the 1 invalid role entry + # since we set delete_after_migration to True in the migration. + self.assertEqual(len(original_state_access_roles), 1) + + # All original entries + 3 users * 4 roles = 12 + # plus the original invalid entry = 1 + 12 = 13 total entries + self.assertEqual(len(after_migrate_state_access_roles), 1 + 12) + + # Sanity check to ensure we have the expected number of UserSubjects related to + # the course permissions before migration (3 users * 4 roles = 12) + self.assertEqual(len(original_state_user_subjects), 12) + + # After rollback, we should have 0 UserSubjects related to the course permissions + self.assertEqual(len(state_after_migration_user_subjects), 0) + + @patch("openedx_authz.api.data.CourseOverview", CourseOverview) + def test_migrate_legacy_course_roles_to_authz_and_rollback_with_no_new_role_equivalent(self): + """Test the migration of legacy course roles to the new Casbin-based model + and the rollback when there is no equivalent new role. + """ + + # Migrate from legacy CourseAccessRole to new Casbin-based model + permissions_with_errors = migrate_legacy_course_roles_to_authz(CourseAccessRole, delete_after_migration=True) + AuthzEnforcer.get_enforcer().load_policy() + + # Now let's rollback + + # Capture the state of permissions before rollback to verify that rollback restores the original state + original_state_user_subjects = list( + UserSubject.objects.filter(casbin_rules__scope__coursescope__course_overview__isnull=False) + .distinct() + .order_by("id") + .values("id", "user_id") + ) + original_state_access_roles = list( + CourseAccessRole.objects.all().order_by("id").values("id", "user_id", "org", "course_id", "role") + ) + + # Mock the COURSE_ROLE_EQUIVALENCES mapping to only include a mapping + # for COURSE_ADMIN to simulate the scenario where the staff, limited_staff + # and data_researcher roles do not have a legacy role equivalent and + # therefore cannot be migrated back to legacy roles during the rollback. + with patch( + "openedx_authz.engine.utils.COURSE_ROLE_EQUIVALENCES", + {COURSE_ADMIN.external_key: "instructor"}, + ): + permissions_with_errors = migrate_authz_to_legacy_course_roles( + CourseAccessRole, UserSubject, delete_after_migration=True + ) + + # Check that each user has the expected legacy role after rollback + # and that errors are logged for any permissions that could not be rolled back + for user in self.admin_users: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + self.assertEqual(len(assignments), 0) + for user in self.staff_users: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + # Since we are mocking the COURSE_ROLE_EQUIVALENCES mapping to only include a mapping for COURSE_ADMIN, + # the staff role will not have a legacy role equivalent and therefore should not be migrated back + self.assertEqual(len(assignments), 1) + for user in self.limited_staff: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + # Since we are mocking the COURSE_ROLE_EQUIVALENCES mapping to only include a mapping for COURSE_ADMIN, + # the limited_staff role will not have a legacy role equivalent and therefore should not be migrated back + self.assertEqual(len(assignments), 1) + for user in self.data_researcher: + assignments = get_user_role_assignments_in_scope( + user_external_key=user.username, scope_external_key=self.course_id + ) + # Since we are mocking the COURSE_ROLE_EQUIVALENCES mapping to only include a mapping for COURSE_ADMIN, + # the data_researcher role will not have a legacy role equivalent and therefore should not be migrated back + self.assertEqual(len(assignments), 1) + + # 3 staff + 3 limited_staff + 3 data_researcher = 9 entries with no legacy role equivalent + self.assertEqual(len(permissions_with_errors), 9) + + state_after_migration_user_subjects = list( + UserSubject.objects.filter(casbin_rules__scope__coursescope__course_overview__isnull=False) + .distinct() + .order_by("id") + .values("id", "user_id") + ) + after_migrate_state_access_roles = list( + CourseAccessRole.objects.all().order_by("id").values("id", "user_id", "org", "course_id", "role") + ) + + # Before the rollback, we should only have the 1 invalid role entry + # since we set delete_after_migration to True in the migration. + self.assertEqual(len(original_state_access_roles), 1) + + # All original entries (1) + 3 users * 1 roles = 4 + self.assertEqual(len(after_migrate_state_access_roles), 1 + 3) + + # Before the rollback, we should have the 12 UserSubjects related to the course permissions + # since we had 3 users with 4 roles each in the original state. + self.assertEqual(len(original_state_user_subjects), 12) + + # After rollback, we should have 9 UserSubjects related to the course permissions + # since the users with staff, limited_staff and data_researcher roles will not be + # migrated back to legacy roles due to our mocked COURSE_ROLE_EQUIVALENCES mapping. + self.assertEqual(len(state_after_migration_user_subjects), 9) + + @patch("openedx_authz.management.commands.authz_migrate_course_authoring.CourseAccessRole", CourseAccessRole) + @patch("openedx_authz.management.commands.authz_migrate_course_authoring.migrate_legacy_course_roles_to_authz") + def test_authz_migrate_course_authoring_command(self, mock_migrate): + """ + Verify that the authz_migrate_course_authoring command + calls migrate_legacy_course_roles_to_authz with the correct arguments. + """ + + mock_migrate.return_value = [] + + # Run without --delete + call_command("authz_migrate_course_authoring") + + mock_migrate.assert_called_once() + args, kwargs = mock_migrate.call_args + + self.assertEqual(kwargs["delete_after_migration"], False) + + mock_migrate.reset_mock() + + # Run with --delete + with patch("builtins.input", return_value="yes"): + call_command("authz_migrate_course_authoring", "--delete") + + mock_migrate.assert_called_once() + args, kwargs = mock_migrate.call_args + + self.assertEqual(kwargs["delete_after_migration"], True) + + @patch("openedx_authz.management.commands.authz_rollback_course_authoring.CourseAccessRole", CourseAccessRole) + @patch("openedx_authz.management.commands.authz_rollback_course_authoring.migrate_authz_to_legacy_course_roles") + def test_authz_rollback_course_authoring_command(self, mock_rollback): + """ + Verify that the authz_rollback_course_authoring command + calls migrate_authz_to_legacy_course_roles correctly. + """ + + mock_rollback.return_value = [] + + # Run without --delete + call_command("authz_rollback_course_authoring") + + mock_rollback.assert_called_once() + args, kwargs = mock_rollback.call_args + + self.assertEqual(kwargs["delete_after_migration"], False) + + mock_rollback.reset_mock() + + # Run with --delete + with patch("builtins.input", return_value="yes"): + call_command("authz_rollback_course_authoring", "--delete") + + self.assertEqual(mock_rollback.call_count, 1) + + call_kwargs = mock_rollback.call_args_list[0][1] + + self.assertEqual(call_kwargs["delete_after_migration"], True)