Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ Change Log
Unreleased
**********

0.23.0 - 2026-02-18
********************

Added
=====

* Add authz_migrate_course_authoring command to migrate legacy CourseAccessRole data to the new Authz (Casbin-based) system
* Add authz_rollback_course_authoring command to rollback Authz roles back to legacy CourseAccessRole
* Support optional --delete flag for controlled cleanup of source permissions after successful migration
* Add migrate_legacy_course_roles_to_authz and migrate_authz_to_legacy_course_roles service functions
* Add unit tests to verify migration and command behavior

Added
=====

Expand Down
2 changes: 1 addition & 1 deletion openedx_authz/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

import os

__version__ = "0.22.0"
__version__ = "0.23.0"

ROOT_DIRECTORY = os.path.dirname(os.path.abspath(__file__))
152 changes: 150 additions & 2 deletions openedx_authz/engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,32 @@

from casbin import Enforcer

from openedx_authz.api.users import assign_role_to_user_in_scope, batch_assign_role_to_users_in_scope
from openedx_authz.constants.roles import LIBRARY_ADMIN, LIBRARY_AUTHOR, LIBRARY_USER
from openedx_authz.api.users import (
assign_role_to_user_in_scope,
batch_assign_role_to_users_in_scope,
batch_unassign_role_from_users,
get_user_role_assignments,
)
from openedx_authz.constants.roles import (
COURSE_ADMIN,
COURSE_DATA_RESEARCHER,
COURSE_LIMITED_STAFF,
COURSE_STAFF,
LEGACY_COURSE_ROLE_EQUIVALENCES,
LIBRARY_ADMIN,
LIBRARY_AUTHOR,
LIBRARY_USER,
)

logger = logging.getLogger(__name__)

GROUPING_POLICY_PTYPES = ["g", "g2", "g3", "g4", "g5", "g6"]


# Map new roles back to legacy roles for rollback purposes
COURSE_ROLE_EQUIVALENCES = {v: k for k, v in LEGACY_COURSE_ROLE_EQUIVALENCES.items()}


def migrate_policy_between_enforcers(
source_enforcer: Enforcer,
target_enforcer: Enforcer,
Expand Down Expand Up @@ -151,3 +169,133 @@ def migrate_legacy_permissions(ContentLibraryPermission):
)

return permissions_with_errors


def migrate_legacy_course_roles_to_authz(CourseAccessRole, delete_after_migration):
"""
Migrate legacy course role data to the new Casbin-based authorization model.
This function reads legacy permissions from the CourseAccessRole model
and assigns equivalent roles in the new authorization system.

The old Course permissions are stored in the CourseAccessRole model, it consists of the following columns:

- user: FK to User
- org: optional Organization string
- course_id: optional CourseKeyField of Course
- role: 'instructor' | 'staff' | 'limited_staff' | 'data_researcher'

In the new Authz model, this would roughly translate to:

- course_id: scope
- user: subject
- role: role

param CourseAccessRole: The CourseAccessRole model to use.
"""

legacy_permissions = CourseAccessRole.objects.select_related("user").all()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure to only do the migration on courses that have the flag enabled.


# List to keep track of any permissions that could not be migrated
permissions_with_errors = []
permissions_with_no_errors = []

for permission in legacy_permissions:
# Migrate the permission to the new model

# Derive equivalent role based on access level
map_legacy_role = {
"instructor": COURSE_ADMIN,
"staff": COURSE_STAFF,
"limited_staff": COURSE_LIMITED_STAFF,
"data_researcher": COURSE_DATA_RESEARCHER,
}

role = map_legacy_role.get(permission.role)
if role is None:
# This should not happen as there are no more access_levels defined
# in CourseAccessRole, log and skip
logger.error(f"Unknown access level: {permission.role} for User: {permission.user}")
permissions_with_errors.append(permission)
continue

# Permission applied to individual user
logger.info(
f"Migrating permission for User: {permission.user.username} "
f"to Role: {role.external_key} in Scope: {permission.course_id}"
)

assign_role_to_user_in_scope(
user_external_key=permission.user.username,
role_external_key=role.external_key,
scope_external_key=str(permission.course_id),
)
permissions_with_no_errors.append(permission)

if delete_after_migration:
CourseAccessRole.objects.filter(id__in=[p.id for p in permissions_with_no_errors]).delete()

return permissions_with_errors


def migrate_authz_to_legacy_course_roles(CourseAccessRole, UserSubject, delete_after_migration):
"""
Migrate permissions from the new Casbin-based authorization model back to the legacy CourseAccessRole model.
This function reads permissions from the Casbin enforcer and creates equivalent entries in the
CourseAccessRole model.

This is essentially the reverse of migrate_legacy_course_roles_to_authz and is intended
for rollback purposes in case of migration issues.
"""
# 1. Get all users with course-related permissions in the new model by filtering
# UserSubjects that are linked to CourseScopes with a valid course overview.
course_subjects = (
UserSubject.objects.filter(casbin_rules__scope__coursescope__course_overview__isnull=False)
.select_related("user")
.distinct()
)

roles_with_errors = []

for course_subject in course_subjects:
user = course_subject.user
user_external_key = user.username

# 2. Get all role assignments for the user
role_assignments = get_user_role_assignments(user_external_key=user_external_key)

for assignment in role_assignments:
scope = assignment.scope.external_key

course_overview = assignment.scope.get_object()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only rollback courses that don't have the flag enabled


for role in assignment.roles:
legacy_role = COURSE_ROLE_EQUIVALENCES.get(role.external_key)
if legacy_role is None:
logger.error(f"Unknown role: {role} for User: {user_external_key}")
roles_with_errors.append((user_external_key, role.external_key, scope))
continue

try:
# Create legacy CourseAccessRole entry
CourseAccessRole.objects.get_or_create(
user=user,
org=course_overview.org,
course_id=scope,
role=legacy_role,
)
except Exception as e: # pylint: disable=broad-exception-caught
logger.error(
f"Error creating CourseAccessRole for User: "
f"{user_external_key}, Role: {legacy_role}, Course: {scope}: {e}"
)
roles_with_errors.append((user_external_key, role.external_key, scope))
continue

# If we successfully created the legacy role, we can unassign the new role
if delete_after_migration:
batch_unassign_role_from_users(
users=[user_external_key],
role_external_key=role.external_key,
scope_external_key=scope,
)
return roles_with_errors
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""
Django management command to migrate legacy course authoring roles to the new Authz (Casbin-based) authorization system.
"""

from django.core.management.base import BaseCommand
from django.db import transaction

from openedx_authz.engine.utils import migrate_legacy_course_roles_to_authz

try:
from common.djangoapps.student.models import CourseAccessRole
except ImportError:
CourseAccessRole = None # type: ignore


class Command(BaseCommand):
"""
Django command to migrate legacy CourseAccessRole data
to the new Authz (Casbin-based) authorization system.
"""

help = "Migrate legacy course authoring roles to the new Authz system."

def add_arguments(self, parser):
parser.add_argument(
"--delete",
action="store_true",
help="Delete legacy CourseAccessRole records after successful migration.",
)

def handle(self, *args, **options):
delete_after_migration = options["delete"]

self.stdout.write(self.style.WARNING("Starting legacy → Authz migration..."))

try:
with transaction.atomic():
errors = migrate_legacy_course_roles_to_authz(
CourseAccessRole=CourseAccessRole,
delete_after_migration=False, # control deletion here instead
)

if errors:
self.stdout.write(self.style.ERROR(f"Migration completed with {len(errors)} errors."))
else:
self.stdout.write(self.style.SUCCESS("Migration completed successfully with no errors."))

# Handle deletion separately for safety
if delete_after_migration:
confirm = input(
"Are you sure you want to delete successfully migrated legacy roles? Type 'yes' to continue: "
)

if confirm != "yes":
self.stdout.write(self.style.WARNING("Deletion aborted."))
return

migrated_ids = [p.id for p in CourseAccessRole.objects.all() if p not in errors]

CourseAccessRole.objects.filter(id__in=migrated_ids).delete()

self.stdout.write(self.style.SUCCESS("Legacy roles deleted successfully."))

except Exception as exc:
self.stdout.write(self.style.ERROR(f"Migration failed due to unexpected error: {exc}"))
raise

self.stdout.write(self.style.SUCCESS("Done."))
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""
Django management command to rollback course authoring roles from the new Authz (Casbin-based)
authorization system back to the legacy CourseAccessRole model.
"""

from django.core.management.base import BaseCommand
from django.db import transaction

from openedx_authz.engine.utils import migrate_authz_to_legacy_course_roles
from openedx_authz.models.subjects import UserSubject

try:
from common.djangoapps.student.models import CourseAccessRole
except ImportError:
CourseAccessRole = None # type: ignore


class Command(BaseCommand):
"""
Django command to rollback course authoring roles
from the new Authz system back to legacy CourseAccessRole.
"""

help = "Rollback Authz course authoring roles to legacy CourseAccessRole."

def add_arguments(self, parser):
parser.add_argument(
"--delete",
action="store_true",
help="Delete Authz role assignments after successful rollback.",
)

def handle(self, *args, **options):
delete_after_migration = options["delete"]

self.stdout.write(self.style.WARNING("Starting Authz → Legacy rollback migration..."))

try:
with transaction.atomic():
errors = migrate_authz_to_legacy_course_roles(
CourseAccessRole=CourseAccessRole,
UserSubject=UserSubject,
delete_after_migration=False, # control deletion here
)

if errors:
self.stdout.write(self.style.ERROR(f"Rollback completed with {len(errors)} errors."))
else:
self.stdout.write(self.style.SUCCESS("Rollback completed successfully with no errors."))

# Handle deletion separately for safety
if delete_after_migration:
confirm = input(
"Are you sure you want to remove the new Authz role "
"assignments after rollback? Type 'yes' to continue: "
)

if confirm != "yes":
self.stdout.write(self.style.WARNING("Deletion aborted."))
return

# Re-run with deletion enabled
migrate_authz_to_legacy_course_roles(
CourseAccessRole=CourseAccessRole,
UserSubject=UserSubject,
delete_after_migration=True,
)

self.stdout.write(self.style.SUCCESS("Authz role assignments removed successfully."))

except Exception as exc:
self.stdout.write(self.style.ERROR(f"Rollback failed due to unexpected error: {exc}"))
raise

self.stdout.write(self.style.SUCCESS("Done."))
54 changes: 54 additions & 0 deletions openedx_authz/tests/stubs/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,3 +125,57 @@ def get_from_id(cls, course_key):
key = str(course_key)
obj, _ = cls.objects.get_or_create(id=key)
return obj


class NoneToEmptyQuerySet(models.query.QuerySet):
"""
A :class:`django.db.query.QuerySet` that replaces `None` values passed to `filter` and `exclude`
with the corresponding `Empty` value for all fields with an `Empty` attribute.

This is to work around Django automatically converting `exact` queries for `None` into
`isnull` queries before the field has a chance to convert them to queries for it's own
empty value.
"""

def _filter_or_exclude(self, *args, **kwargs):
for field_object in self.model._meta.get_fields():
direct = not field_object.auto_created or field_object.concrete
if direct and hasattr(field_object, "Empty"):
for suffix in ("", "_exact"):
key = f"{field_object.name}{suffix}"
if key in kwargs and kwargs[key] is None:
kwargs[key] = field_object.Empty

return super()._filter_or_exclude(*args, **kwargs)


class NoneToEmptyManager(models.Manager):
"""
A :class:`django.db.models.Manager` that has a :class:`NoneToEmptyQuerySet`
as its `QuerySet`, initialized with a set of specified `field_names`.
"""

def get_queryset(self):
"""
Returns the result of NoneToEmptyQuerySet instead of a regular QuerySet.
"""
return NoneToEmptyQuerySet(self.model, using=self._db)


class CourseAccessRole(models.Model):
"""
Maps users to org, courses, and roles. Used by student.roles.CourseRole and OrgRole.
To establish a user as having a specific role over all courses in the org, create an entry
without a course_id.

.. no_pii:
"""

objects = NoneToEmptyManager()

user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
# blank org is for global group based roles such as course creator (may be deprecated)
org = models.CharField(max_length=64, db_index=True, blank=True)
# blank course_id implies org wide role
course_id = CourseKeyField(max_length=255, db_index=True, blank=True)
role = models.CharField(max_length=64, db_index=True)
Loading