Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions misc/management/commands/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from scoring.jobs import (
finalize_leaderboards,
update_global_comment_and_question_leaderboards,
update_custom_leaderboards,
)
from scoring.utils import update_medal_points_and_ranks

Expand Down Expand Up @@ -200,6 +201,13 @@ def handle(self, *args, **options):
max_instances=1,
replace_existing=True,
)
scheduler.add_job(
close_old_connections(update_custom_leaderboards),
trigger=CronTrigger.from_crontab("0 5 * * *"), # Every day at 05:00 UTC
id="update_custom_leaderboards",
max_instances=1,
replace_existing=True,
)

#
# Comment Jobs
Expand Down
9 changes: 9 additions & 0 deletions scoring/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from scoring.constants import LeaderboardScoreTypes
from scoring.models import Leaderboard
from scoring.utils import update_project_leaderboard
from scoring.tasks import update_custom_leaderboard

logger = logging.getLogger(__name__)

Expand All @@ -30,3 +31,11 @@ def finalize_leaderboards():
if finalize_time and finalize_time <= timezone.now():
logger.info(f"Finalizing leaderboard: {leaderboard}")
update_project_leaderboard(leaderboard=leaderboard)


def update_custom_leaderboards():
update_custom_leaderboard(
project_id=1,
minimum_time=None,
spot_times=None,
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add specific items here
be sure to add failsafes

165 changes: 165 additions & 0 deletions scoring/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import logging
from datetime import datetime
from collections import defaultdict

from django.db.models import QuerySet
import dramatiq

from posts.models import Post
from scoring.constants import LeaderboardScoreTypes, ScoreTypes
from scoring.models import Leaderboard, Score
from scoring.score_math import evaluate_question
from projects.models import Project
from questions.models import Question
from questions.constants import UnsuccessfulResolutionType
from scoring.utils import generate_entries_from_scores, process_entries_for_leaderboard

logger = logging.getLogger(__name__)


def calculate_minimum_time_scores(
questions: QuerySet[Question],
minimum_time: datetime,
score_type: ScoreTypes = ScoreTypes.PEER,
) -> list[Score]:

scores: list[Score] = []

c = questions.count()
i = 0
for question in questions:
i += 1
logger.info(f"Processing question {i}/{c} (ID: {question.id})")
if question.open_time >= minimum_time:
scores.extend(question.scores.filter(score_type=score_type))
continue
question.open_time = minimum_time
# simulate scores as if question open_time was minimum_time
new_scores = evaluate_question(
question=question,
resolution=question.resolution,
score_types=[score_type],
)
scores.extend(new_scores)

return scores


def calculate_spot_times_scores(
questions: QuerySet[Question],
spot_times: list[datetime],
score_type: ScoreTypes = ScoreTypes.SPOT_PEER,
) -> list[Score]:

scores: list[Score] = []

c = questions.count()
i = 0
for question in questions:
i += 1
logger.info(f"Processing question {i}/{c} (ID: {question.id})")
question_scores: list[Score] = []
for spot_time in spot_times:
# simulate scores as if question spot_scoring_time was spot_time
new_scores = evaluate_question(
question=question,
resolution=question.resolution,
score_types=[score_type],
spot_forecast_time=spot_time,
)
question_scores.extend(new_scores)
user_score_map = defaultdict(list)
for score in question_scores:
user_score_map[(score.user_id, score.aggregation_method)].append(
score.score
)
for (user_id, aggregation_method), user_scores in user_score_map.items():
scores.append(
Score(
user_id=user_id,
aggregation_method=aggregation_method,
score=sum(user_scores) / len(spot_times),
score_type=score_type,
question=question,
coverage=len(user_scores) / len(spot_times),
)
)

return scores


@dramatiq.actor
def update_custom_leaderboard(
project_id: int,
minimum_time: datetime | None = None,
spot_times: list[datetime] | None = None,
score_type: ScoreTypes = ScoreTypes.PEER,
) -> None:
project = Project.objects.filter(id=project_id).first()
if not project:
logger.error(f"Project with id {project_id} does not exist.")
return
if (not minimum_time and not spot_times) or (minimum_time and spot_times):
logger.error("minimum_time or spot_times must be provided, but not both.")
return

# setup
name = (
f"Set open_time for {project.name} at {minimum_time}"
if minimum_time
else (f"Spot time for {project.name} at {len(spot_times)} spot times")
)
leaderboard, _ = Leaderboard.objects.get_or_create(
prize_pool=0,
name=name,
project=project,
score_type=LeaderboardScoreTypes.MANUAL,
)
questions = (
leaderboard.get_questions()
.filter(
related_posts__post__curation_status=Post.CurationStatus.APPROVED,
resolution__isnull=False,
)
.exclude(resolution__in=UnsuccessfulResolutionType)
)
# detect if any questions actually resolved since last evaluation
existing_entries = leaderboard.entries.all()
if existing_entries.exists():
last_evaluation_time = max(
entry.calculated_on for entry in existing_entries if entry.calculated_on
)
questions = questions.filter(resolution_set_time__gt=last_evaluation_time)
if not questions.exists():
logger.info(
"No questions resolved since last evaluation "
f"at {last_evaluation_time}, skipping leaderboard update."
)
return

if minimum_time:
scores = calculate_minimum_time_scores(questions, minimum_time, score_type)

if spot_times:
if score_type == ScoreTypes.PEER:
score_type = ScoreTypes.SPOT_PEER
if score_type == ScoreTypes.BASELINE:
score_type = ScoreTypes.SPOT_BASELINE
scores = calculate_spot_times_scores(questions, spot_times, score_type)

# temporarily change leaderboard type for entry creation
if score_type in [ScoreTypes.PEER, ScoreTypes.SPOT_PEER]:
leaderboard.score_type = LeaderboardScoreTypes.PEER_TOURNAMENT
elif score_type in [ScoreTypes.BASELINE, ScoreTypes.SPOT_BASELINE]:
leaderboard.score_type = LeaderboardScoreTypes.SPOT_BASELINE_TOURNAMENT
else:
leaderboard.score_type = score_type
new_entries = generate_entries_from_scores(scores, questions, leaderboard)
leaderboard.score_type = LeaderboardScoreTypes.MANUAL

process_entries_for_leaderboard(
new_entries, project, leaderboard, force_finalize=False
)

logger.info(f"Updated leaderboard: {leaderboard.name} with id {leaderboard.id}")
return
101 changes: 64 additions & 37 deletions scoring/utils.py
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only things happening in this file is factoring out bits from a few bulky functions so they can be used by the new custom leaderboard update task.

Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ def score_question(
invalidate_average_coverage_cache([question])


def generate_scoring_leaderboard_entries(
def retrieve_question_scores(
questions: list[Question],
leaderboard: Leaderboard,
) -> list[LeaderboardEntry]:
) -> list[Score | ArchivedScore]:
score_type = LeaderboardScoreTypes.get_base_score(leaderboard.score_type) or F(
"question__default_score_type"
)
Expand Down Expand Up @@ -176,6 +176,14 @@ def generate_scoring_leaderboard_entries(
scores = list(archived_scores) + list(calculated_scores)
scores = sorted(scores, key=lambda x: x.user_id or x.score)

return scores


def generate_entries_from_scores(
scores: list[Score | ArchivedScore],
questions: list[Question],
leaderboard: Leaderboard,
) -> list[LeaderboardEntry]:
entries: dict[int | AggregationMethod, LeaderboardEntry] = {}
now = timezone.now()
maximum_coverage = sum(
Expand Down Expand Up @@ -219,6 +227,14 @@ def generate_scoring_leaderboard_entries(
return sorted(entries.values(), key=lambda entry: entry.score, reverse=True)


def generate_scoring_leaderboard_entries(
questions: list[Question],
leaderboard: Leaderboard,
) -> list[LeaderboardEntry]:
scores = retrieve_question_scores(questions, leaderboard)
return generate_entries_from_scores(scores, questions, leaderboard)


def generate_comment_insight_leaderboard_entries(
leaderboard: Leaderboard,
) -> list[LeaderboardEntry]:
Expand Down Expand Up @@ -714,40 +730,21 @@ def assign_prizes(
return entries


def update_project_leaderboard(
project: Project | None = None,
leaderboard: Leaderboard | None = None,
force_update: bool = False,
def process_entries_for_leaderboard(
entries: list[LeaderboardEntry],
project: Project,
leaderboard: Leaderboard,
force_finalize: bool = False,
) -> list[LeaderboardEntry]:
if project is None and leaderboard is None:
raise ValueError("Either project or leaderboard must be provided")

leaderboard = leaderboard or project.primary_leaderboard
project = project or leaderboard.project
if not leaderboard:
raise ValueError("Leaderboard not found")

if leaderboard.score_type == LeaderboardScoreTypes.MANUAL:
logger.info("%s is manual, not updating", leaderboard.name)
return list(leaderboard.entries.all().order_by("rank"))

if not force_update and leaderboard.finalized:
logger.warning("%s is already finalized, not updating", str(leaderboard))
return list(leaderboard.entries.all().order_by("rank"))

# new entries
new_entries = generate_project_leaderboard(project, leaderboard)

# assign ranks - also applies exclusions
bot_status = leaderboard.bot_status or project.bot_leaderboard_status
bots_get_ranks = bot_status in [
Project.BotLeaderboardStatus.BOTS_ONLY,
Project.BotLeaderboardStatus.INCLUDE,
]
humans_get_ranks = bot_status != Project.BotLeaderboardStatus.BOTS_ONLY
new_entries = assign_ranks(
new_entries,
entries = assign_ranks(
entries,
leaderboard,
include_humans=humans_get_ranks,
include_bots=bots_get_ranks,
Expand All @@ -762,10 +759,10 @@ def update_project_leaderboard(
minimum_prize_percent = (
float(leaderboard.minimum_prize_amount) / float(prize_pool) if prize_pool else 0
)
new_entries = assign_prize_percentages(new_entries, minimum_prize_percent)
entries = assign_prize_percentages(entries, minimum_prize_percent)

if prize_pool: # always assign prizes
new_entries = assign_prizes(new_entries, prize_pool)
entries = assign_prizes(entries, prize_pool)
# check if we're ready to finalize and assign medals/prizes if applicable
finalize_time = leaderboard.finalize_time or (
project.close_date if project else None
Expand All @@ -781,7 +778,7 @@ def update_project_leaderboard(
and project.default_permission == ObjectPermission.FORECASTER
and project.visibility == Project.Visibility.NORMAL
):
new_entries = assign_medals(new_entries)
entries = assign_medals(entries)
# always set finalize
Leaderboard.objects.filter(pk=leaderboard.pk).update(finalized=True)

Expand All @@ -791,17 +788,47 @@ def update_project_leaderboard(
for entry in leaderboard.entries.all()
}

for new_entry in new_entries:
new_entry.leaderboard = leaderboard
new_entry.id = previous_entries_map.get(
(new_entry.user_id, new_entry.aggregation_method)
)
for entry in entries:
entry.leaderboard = leaderboard
entry.id = previous_entries_map.get((entry.user_id, entry.aggregation_method))

with transaction.atomic():
leaderboard.entries.all().delete()
LeaderboardEntry.objects.bulk_create(new_entries, batch_size=500)
LeaderboardEntry.objects.bulk_create(entries, batch_size=500)

return new_entries
return entries


def update_project_leaderboard(
project: Project | None = None,
leaderboard: Leaderboard | None = None,
force_update: bool = False,
force_finalize: bool = False,
) -> list[LeaderboardEntry]:
if project is None and leaderboard is None:
raise ValueError("Either project or leaderboard must be provided")

leaderboard = leaderboard or project.primary_leaderboard
project = project or leaderboard.project
if not leaderboard:
raise ValueError("Leaderboard not found")

if leaderboard.score_type == LeaderboardScoreTypes.MANUAL:
logger.info("%s is manual, not updating", leaderboard.name)
return list(leaderboard.entries.all().order_by("rank"))

if not force_update and leaderboard.finalized:
logger.warning("%s is already finalized, not updating", str(leaderboard))
return list(leaderboard.entries.all().order_by("rank"))

# new entries
new_entries = generate_project_leaderboard(project, leaderboard)

# process entries
processed_entries = process_entries_for_leaderboard(
new_entries, project, leaderboard, force_finalize=force_finalize
)
return processed_entries


def update_leaderboard_from_csv_data(
Expand Down