Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions completion_aggregator/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from __future__ import absolute_import, division, print_function, unicode_literals

from eventtracking import tracker
from opaque_keys.edx.django.models import CourseKeyField, UsageKeyField
from opaque_keys.edx.keys import CourseKey, UsageKey

Expand Down Expand Up @@ -171,8 +172,44 @@ def submit_completion(self, user, course_key, block_key, aggregation_name, earne
'last_modified': last_modified,
},
)
self.emit_completion_aggregator_logs([obj])

return obj, is_new

@staticmethod
def emit_completion_aggregator_logs(updated_aggregators):
"""
Emit a tracking log for each element of the list parameter.

Parameters
----------
updated_aggregators: List of Aggregator intances

"""
for obj in updated_aggregators:
event = "progress" if obj.percent < 1 else "completion"
event_type = obj.aggregation_name

if event_type not in settings.ALLOWED_COMPLETION_AGGREGATOR_EVENT_TYPES.get(event, []):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if event_type not in settings.ALLOWED_COMPLETION_AGGREGATOR_EVENT_TYPES.get(event, []):
if event_type not in settings.ALLOWED_COMPLETION_AGGREGATOR_EVENT_TYPES.get(event, {}):

continue

event_name = f"edx.completion_aggregator.{event}.{event_type}"

tracker.emit(
event_name,
{
"user_id": obj.user_id,
"course_id": str(obj.course_key),
"block_id": str(obj.block_key),
"modified": obj.modified,
"created": obj.created,
"earned": obj.earned,
"possible": obj.possible,
"percent": obj.percent,
"type": event_type,
}
)
Comment on lines +198 to +211
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the cost of this operation (especially when we use multiple event-tracking backends)? Is it possible to emit multiple events in bulk?

I want to be particularly cautious about this, as we had faced a partial outage before due to the high volume of additional operations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After running a basic test in my local I got the following
image

As you can see the time is in the order of 0.002 seconds, that test was performed with the following EVENT_TRACKING_BACKENDS value

{'tracking_logs': {'ENGINE': 'eventtracking.backends.routing.RoutingBackend',
  'OPTIONS': {'backends': {'logger': {'ENGINE': 'eventtracking.backends.logger.LoggerBackend',
     'OPTIONS': {'name': 'tracking', 'max_event_size': 50000}}},
   'processors': [{'ENGINE': 'common.djangoapps.track.shim.LegacyFieldMappingProcessor'},
    {'ENGINE': 'common.djangoapps.track.shim.PrefixedEventProcessor'}]}},
 'segmentio': {'ENGINE': 'eventtracking.backends.routing.RoutingBackend',
  'OPTIONS': {'backends': {'segment': {'ENGINE': 'eventtracking.backends.segment.SegmentBackend'}},
   'processors': [{'ENGINE': 'eventtracking.processors.whitelist.NameWhitelistProcessor',
     'OPTIONS': {'whitelist': []}},
    {'ENGINE': 'common.djangoapps.track.shim.GoogleAnalyticsProcessor'}]}},
 'xapi': {'ENGINE': 'eventtracking.backends.async_routing.AsyncRoutingBackend',
  'OPTIONS': {'backend_name': 'xapi',
   'processors': [{'ENGINE': 'eventtracking.processors.whitelist.NameWhitelistProcessor',
     'OPTIONS': {'whitelist': ['edx.problem.completed',
       'showanswer',
       'edx.completion.block_completion.changed',
       'edx.grades.problem.submitted',
       'edx.course.enrollment.activated',
       'edx.course.enrollment.deactivated',
       'edx.course.grade.passed.first_time',
       'edx.course.grade.now_passed',
       'edx.course.grade.now_failed',
       'load_video',
       'edx.video.loaded',
       'play_video',
       'edx.video.played',
       'stop_video',
       'edx.video.stopped',
       'complete_video',
       'edx.video.completed',
       'pause_video',
       'edx.video.paused']}}],
   'backends': {'xapi': {'ENGINE': 'event_routing_backends.backends.events_router.EventsRouter',
     'OPTIONS': {'processors': [{'ENGINE': 'event_routing_backends.processors.xapi.transformer_processor.XApiProcessor',
        'OPTIONS': {}}],
      'backend_name': 'xapi'}}}}},
 'caliper': {'ENGINE': 'eventtracking.backends.async_routing.AsyncRoutingBackend',
  'OPTIONS': {'backend_name': 'caliper',
   'processors': [{'ENGINE': 'eventtracking.processors.whitelist.NameWhitelistProcessor',
     'OPTIONS': {'whitelist': ['edx.course.enrollment.activated',
       'edx.course.enrollment.deactivated',
       'edx.ui.lms.link_clicked',
       'edx.ui.lms.sequence.outline.selected',
       'edx.ui.lms.outline.selected',
       'edx.ui.lms.sequence.next_selected',
       'edx.ui.lms.sequence.previous_selected',
       'edx.ui.lms.sequence.tab_selected',
       'showanswer',
       'edx.problem.hint.demandhint_displayed',
       'problem_check',
       'load_video',
       'edx.video.loaded',
       'play_video',
       'edx.video.played',
       'complete_video',
       'edx.video.completed',
       'stop_video',
       'edx.video.stopped',
       'pause_video',
       'edx.video.paused',
       'seek_video',
       'edx.video.position.changed',
       'edx.course.grade.passed.first_time',
       'edx.course.grade.now_passed',
       'edx.course.grade.now_failed']}}],
   'backends': {'caliper': {'ENGINE': 'event_routing_backends.backends.events_router.EventsRouter',
     'OPTIONS': {'processors': [{'ENGINE': 'event_routing_backends.processors.caliper.transformer_processor.CaliperProcessor',
        'OPTIONS': {}},
       {'ENGINE': 'event_routing_backends.processors.caliper.envelope_processor.CaliperEnvelopeProcessor',
        'OPTIONS': {'sensor_id': 'https://localhost:18000'}}],
      'backend_name': 'caliper'}}}}}}

If I'm not wrong tracking_logs and segmentio comes by default, xapi and caliper are added by the event-routing-backends library.

Finally I'm not aware of a bulk emit capability, I couldn't find anything related to that in the eventtracking library and all the implementation that I found are using the single emit method


def bulk_create_or_update(self, updated_aggregators):
"""
Update the collection of aggregator object using mysql insert on duplicate update query.
Expand All @@ -194,6 +231,7 @@ def bulk_create_or_update(self, updated_aggregators):
else:
aggregation_data = [obj.get_values() for obj in updated_aggregators]
cur.executemany(INSERT_OR_UPDATE_AGGREGATOR_QUERY, aggregation_data)
self.emit_completion_aggregator_logs(updated_aggregators)


class Aggregator(TimeStampedModel):
Expand Down
17 changes: 17 additions & 0 deletions completion_aggregator/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,23 @@ def plugin_settings(settings):
"""
Modify the provided settings object with settings specific to this plugin.
"""
# Emit feature allows to publish two kind of events progress and completion
# This setting controls which type of event will be published to change the default behavior
# the block type should be removed or added from the progress or completion list.
settings.ALLOWED_COMPLETION_AGGREGATOR_EVENT_TYPES = {
"progress": [
"course",
"chapter",
"sequential",
"vertical",
],
"completion": [
"course",
"chapter",
"sequential",
"vertical",
]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"progress": [
"course",
"chapter",
"sequential",
"vertical",
],
"completion": [
"course",
"chapter",
"sequential",
"vertical",
]
"progress": {
"course",
"chapter",
"sequential",
"vertical",
},
"completion": {
"course",
"chapter",
"sequential",
"vertical",
}

}
settings.COMPLETION_AGGREGATOR_BLOCK_TYPES = {
'course',
'chapter',
Expand Down
1 change: 1 addition & 0 deletions requirements/base.in
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ django-model-utils # Provides TimeStampedModel abstract base class
edx-opaque-keys # Provides CourseKey and UsageKey
edx-completion
edx-toggles
event-tracking
six
XBlock[django]
4 changes: 3 additions & 1 deletion requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ edx-toggles==5.1.0
# -r requirements/base.in
# edx-completion
event-tracking==2.2.0
# via edx-completion
# via
# -r requirements/base.in
# edx-completion
fs==2.4.16
# via
# fs-s3fs
Expand Down
17 changes: 17 additions & 0 deletions test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@ def root(*args):
return join(abspath(dirname(__file__)), *args)


ALLOWED_COMPLETION_AGGREGATOR_EVENT_TYPES = {
"progress": [
"course",
"chapter",
"sequential",
"vertical",
],
"completion": [
"course",
"chapter",
"sequential",
"vertical",
]
}
AUTH_USER_MODEL = 'auth.User'
CELERY_ALWAYS_EAGER = True
COMPLETION_AGGREGATOR_BLOCK_TYPES = {'course', 'chapter', 'sequential'}
Expand Down Expand Up @@ -53,6 +67,7 @@ def root(*args):
'oauth2_provider',
'waffle',
'test_utils.test_app',
'eventtracking.django.apps.EventTrackingConfig',
)

LOCALE_PATHS = [root('completion_aggregator', 'conf', 'locale')]
Expand Down Expand Up @@ -81,5 +96,7 @@ def root(*args):
]
USE_TZ = True

EVENT_TRACKING_ENABLED = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is unused. Should we use it to gate this feature?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used by the eventtracking library, in the test scenario I'm implementing the default django_tracker that is added by this AppConfig, that uses this method to register the tracker, after checking the DJANGO_ENABLED_SETTING_NAME which is EVENT_TRACKING_ENABLED as you can see in this line


# pylint: disable=unused-import,wrong-import-position
from test_utils.test_app import celery # isort:skip
37 changes: 37 additions & 0 deletions tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
import ddt
import pytest
import six
from mock import patch
from opaque_keys.edx.keys import UsageKey

from django.conf import settings
from django.contrib.auth import get_user_model
from django.core.exceptions import ValidationError
from django.test import TestCase
Expand All @@ -31,6 +33,12 @@ class AggregatorTestCase(TestCase):
def setUp(self):
super().setUp()
self.user = get_user_model().objects.create(username='testuser')
self.tracker_patch = patch('completion_aggregator.models.tracker')
self.tracker_mock = self.tracker_patch.start()

def tearDown(self):
"""Stop patching."""
self.tracker_mock.stop()

def test_submit_completion_with_invalid_user(self):
with pytest.raises(TypeError):
Expand All @@ -43,6 +51,7 @@ def test_submit_completion_with_invalid_user(self):
possible=27.0,
last_modified=now(),
)
self.tracker_mock.assert_not_called()

@ddt.data(
# Valid arguments
Expand All @@ -64,6 +73,7 @@ def test_submit_completion_with_valid_data(self, block_key_obj, aggregate_name,
self.assertEqual(obj.earned, earned)
self.assertEqual(obj.possible, possible)
self.assertEqual(obj.percent, expected_percent)
self.assert_emit_method_called(obj)

@ddt.data(
# Earned greater than possible
Expand Down Expand Up @@ -105,6 +115,7 @@ def test_submit_completion_with_exception(
)

self.assertEqual(exception_message, str(context_manager.exception))
self.tracker_mock.assert_not_called()

@ddt.data(
(
Expand All @@ -129,6 +140,7 @@ def test_aggregate_completion_string(
f'{six.text_type(block_key_obj)}: {expected_percent}'
)
self.assertEqual(six.text_type(obj), expected_string)
self.assert_emit_method_called(obj)

@ddt.data(
# Changes the value of earned. This does not create a new object.
Expand Down Expand Up @@ -179,6 +191,7 @@ def test_submit_completion_twice_with_changes(
)
self.assertEqual(obj.percent, expected_percent)
self.assertTrue(is_new)
self.assert_emit_method_called(obj)

new_obj, is_new = Aggregator.objects.submit_completion(
user=self.user,
Expand All @@ -193,6 +206,7 @@ def test_submit_completion_twice_with_changes(
self.assertEqual(is_new, is_second_obj_new)
if is_second_obj_new:
self.assertNotEqual(obj.id, new_obj.id)
self.assert_emit_method_called(new_obj)

@ddt.data(
(BLOCK_KEY_OBJ, 'course', 0.5, 1, 0.5),
Expand All @@ -211,3 +225,26 @@ def test_get_values(self, block_key_obj, aggregate_name, earned, possible, expec
values = aggregator.get_values()
self.assertEqual(values['user'], self.user.id)
self.assertEqual(values['percent'], expected_percent)

def assert_emit_method_called(self, obj):
"""Verify that the tracker.emit method was called once with the right values."""
if obj.aggregation_name not in settings.ALLOWED_COMPLETION_AGGREGATOR_EVENT_TYPES:
return

event = "progress" if obj.percent < 1 else "completion"

self.tracker_mock.emit.assert_called_once_with(
f"edx.completion_aggregator.{event}.{obj.aggregation_name}",
{
"user_id": obj.user_id,
"course_id": str(obj.course_key),
"block_id": str(obj.block_key),
"modified": obj.modified,
"created": obj.created,
"earned": obj.earned,
"possible": obj.possible,
"percent": obj.percent,
"type": obj.aggregation_name,
}
)
self.tracker_mock.emit.reset_mock()