diff --git a/misc/management/commands/cron.py b/misc/management/commands/cron.py index cf9360552e..2f86748fd9 100644 --- a/misc/management/commands/cron.py +++ b/misc/management/commands/cron.py @@ -26,6 +26,7 @@ from scoring.jobs import ( finalize_leaderboards, update_global_comment_and_question_leaderboards, + update_custom_leaderboards, ) from scoring.utils import update_medal_points_and_ranks @@ -200,6 +201,13 @@ def handle(self, *args, **options): max_instances=1, replace_existing=True, ) + scheduler.add_job( + close_old_connections(update_custom_leaderboards), + trigger=CronTrigger.from_crontab("0 5 * * *"), # Every day at 05:00 UTC + id="update_custom_leaderboards", + max_instances=1, + replace_existing=True, + ) # # Comment Jobs diff --git a/scoring/jobs.py b/scoring/jobs.py index 89c6900a89..5638f32ce6 100644 --- a/scoring/jobs.py +++ b/scoring/jobs.py @@ -1,10 +1,13 @@ import logging from django.utils import timezone +from datetime import datetime, timezone as dt_timezone +from projects.models import Project from scoring.constants import LeaderboardScoreTypes from scoring.models import Leaderboard from scoring.utils import update_project_leaderboard +from scoring.tasks import update_custom_leaderboard logger = logging.getLogger(__name__) @@ -30,3 +33,40 @@ def finalize_leaderboards(): if finalize_time and finalize_time <= timezone.now(): logger.info(f"Finalizing leaderboard: {leaderboard}") update_project_leaderboard(leaderboard=leaderboard) + + +def update_custom_leaderboards(): + """ + Trigger the custom leaderboard updates. + + Leaderboards to update are hardcoded here. + If adding more, be sure failures are handled gracefully. + """ + + # US Democracy Threat Index + project = Project.objects.filter( + slug="us-democracy-threat", + type=Project.ProjectTypes.INDEX, + ).first() + if project: + try: + update_custom_leaderboard( + project_id=project.id, + minimum_time=datetime(2025, 12, 12, tzinfo=dt_timezone.utc), + spot_times=None, + ) + # TODO: add spot times as they become determined + # update_custom_leaderboard( + # project_id=project.id, + # minimum_time=None, + # spot_times=[datetime(2026, 1, 1, tzinfo=dt_timezone.utc)], + # ) + except Exception as e: + logger.error( + f"Error updating custom leaderboard for project " + f"'{project.name}': {e}" + ) + else: + # don't warn or error because this project doesn't necessarily exist + # in all environments + logger.info("Index 'us-democracy-threat' not found.") diff --git a/scoring/tasks.py b/scoring/tasks.py new file mode 100644 index 0000000000..adc23c546b --- /dev/null +++ b/scoring/tasks.py @@ -0,0 +1,170 @@ +import logging +from datetime import datetime +from collections import defaultdict + +from django.db.models import QuerySet +import dramatiq + +from posts.models import Post +from scoring.constants import LeaderboardScoreTypes, ScoreTypes +from scoring.models import Leaderboard, Score +from scoring.score_math import evaluate_question +from projects.models import Project +from questions.models import Question +from questions.constants import UnsuccessfulResolutionType +from scoring.utils import generate_entries_from_scores, process_entries_for_leaderboard + +logger = logging.getLogger(__name__) + + +def calculate_minimum_time_scores( + questions: QuerySet[Question], + minimum_time: datetime, + score_type: ScoreTypes = ScoreTypes.PEER, +) -> list[Score]: + + scores: list[Score] = [] + + c = questions.count() + i = 0 + for question in questions: + i += 1 + logger.info(f"Processing question {i}/{c} (ID: {question.id})") + if question.open_time >= minimum_time: + scores.extend(question.scores.filter(score_type=score_type)) + continue + question.open_time = minimum_time + # simulate scores as if question open_time was minimum_time + new_scores = evaluate_question( + question=question, + resolution=question.resolution, + score_types=[score_type], + ) + scores.extend(new_scores) + + return scores + + +def calculate_spot_times_scores( + questions: QuerySet[Question], + spot_times: list[datetime], + score_type: ScoreTypes = ScoreTypes.SPOT_PEER, +) -> list[Score]: + + scores: list[Score] = [] + + c = questions.count() + i = 0 + for question in questions: + i += 1 + logger.info(f"Processing question {i}/{c} (ID: {question.id})") + question_scores: list[Score] = [] + for spot_time in spot_times: + # simulate scores as if question spot_scoring_time was spot_time + new_scores = evaluate_question( + question=question, + resolution=question.resolution, + score_types=[score_type], + spot_forecast_time=spot_time, + ) + question_scores.extend(new_scores) + user_score_map = defaultdict(list) + for score in question_scores: + user_score_map[(score.user_id, score.aggregation_method)].append( + score.score + ) + for (user_id, aggregation_method), user_scores in user_score_map.items(): + scores.append( + Score( + user_id=user_id, + aggregation_method=aggregation_method, + score=sum(user_scores) / len(spot_times), + score_type=score_type, + question=question, + coverage=len(user_scores) / len(spot_times), + ) + ) + + return scores + + +@dramatiq.actor +def update_custom_leaderboard( + project_id: int, + minimum_time: datetime | None = None, + spot_times: list[datetime] | None = None, + score_type: ScoreTypes = ScoreTypes.PEER, +) -> None: + project = Project.objects.filter(id=project_id).first() + if not project: + logger.error(f"Project with id {project_id} does not exist.") + return + if bool(minimum_time) == bool(spot_times): + logger.error("minimum_time or spot_times must be provided, but not both.") + return + + # setup + name = ( + f"Set open_time for {project.name} at {minimum_time}" + if minimum_time + else (f"Spot time for {project.name} at {len(spot_times)} spot times") + ) + leaderboard, _ = Leaderboard.objects.get_or_create( + prize_pool=0, + name=name, + project=project, + score_type=LeaderboardScoreTypes.MANUAL, + ) + questions = ( + leaderboard.get_questions() + .filter( + related_posts__post__curation_status=Post.CurationStatus.APPROVED, + resolution__isnull=False, + ) + .exclude(resolution__in=UnsuccessfulResolutionType) + ) + if not questions.exists(): + logger.info(f"No resolved questions found for project {project.name}.") + return + # detect if any questions actually resolved since last evaluation + existing_entries = leaderboard.entries.all() + if existing_entries.exists(): + last_evaluation_time = max( + entry.calculated_on for entry in existing_entries if entry.calculated_on + ) + newly_resolved_questions = questions.filter( + resolution_set_time__gt=last_evaluation_time + ) + if not newly_resolved_questions.exists(): + logger.info( + "No questions resolved since last evaluation " + f"at {last_evaluation_time}, skipping leaderboard update." + ) + return + + if minimum_time: + scores = calculate_minimum_time_scores(questions, minimum_time, score_type) + + if spot_times: + if score_type == ScoreTypes.PEER: + score_type = ScoreTypes.SPOT_PEER + if score_type == ScoreTypes.BASELINE: + score_type = ScoreTypes.SPOT_BASELINE + scores = calculate_spot_times_scores(questions, spot_times, score_type) + + # temporarily change leaderboard type for entry creation + if score_type in [ScoreTypes.PEER, ScoreTypes.SPOT_PEER]: + leaderboard.score_type = LeaderboardScoreTypes.PEER_TOURNAMENT + elif score_type in [ScoreTypes.BASELINE, ScoreTypes.SPOT_BASELINE]: + leaderboard.score_type = LeaderboardScoreTypes.SPOT_BASELINE_TOURNAMENT + else: + leaderboard.score_type = score_type + new_entries = generate_entries_from_scores(scores, questions, leaderboard) + leaderboard.score_type = LeaderboardScoreTypes.MANUAL + + process_entries_for_leaderboard( + new_entries, project, leaderboard, force_finalize=False + ) + + logger.info(f"Updated leaderboard: {leaderboard.name} with id {leaderboard.id}") + return diff --git a/scoring/utils.py b/scoring/utils.py index 35169eb6bf..21c612d6aa 100644 --- a/scoring/utils.py +++ b/scoring/utils.py @@ -129,10 +129,10 @@ def score_question( invalidate_average_coverage_cache([question]) -def generate_scoring_leaderboard_entries( +def retrieve_question_scores( questions: list[Question], leaderboard: Leaderboard, -) -> list[LeaderboardEntry]: +) -> list[Score | ArchivedScore]: score_type = LeaderboardScoreTypes.get_base_score(leaderboard.score_type) or F( "question__default_score_type" ) @@ -176,6 +176,14 @@ def generate_scoring_leaderboard_entries( scores = list(archived_scores) + list(calculated_scores) scores = sorted(scores, key=lambda x: x.user_id or x.score) + return scores + + +def generate_entries_from_scores( + scores: list[Score | ArchivedScore], + questions: list[Question], + leaderboard: Leaderboard, +) -> list[LeaderboardEntry]: entries: dict[int | AggregationMethod, LeaderboardEntry] = {} now = timezone.now() maximum_coverage = sum( @@ -219,6 +227,14 @@ def generate_scoring_leaderboard_entries( return sorted(entries.values(), key=lambda entry: entry.score, reverse=True) +def generate_scoring_leaderboard_entries( + questions: list[Question], + leaderboard: Leaderboard, +) -> list[LeaderboardEntry]: + scores = retrieve_question_scores(questions, leaderboard) + return generate_entries_from_scores(scores, questions, leaderboard) + + def generate_comment_insight_leaderboard_entries( leaderboard: Leaderboard, ) -> list[LeaderboardEntry]: @@ -714,31 +730,12 @@ def assign_prizes( return entries -def update_project_leaderboard( - project: Project | None = None, - leaderboard: Leaderboard | None = None, - force_update: bool = False, +def process_entries_for_leaderboard( + entries: list[LeaderboardEntry], + project: Project, + leaderboard: Leaderboard, force_finalize: bool = False, ) -> list[LeaderboardEntry]: - if project is None and leaderboard is None: - raise ValueError("Either project or leaderboard must be provided") - - leaderboard = leaderboard or project.primary_leaderboard - project = project or leaderboard.project - if not leaderboard: - raise ValueError("Leaderboard not found") - - if leaderboard.score_type == LeaderboardScoreTypes.MANUAL: - logger.info("%s is manual, not updating", leaderboard.name) - return list(leaderboard.entries.all().order_by("rank")) - - if not force_update and leaderboard.finalized: - logger.warning("%s is already finalized, not updating", str(leaderboard)) - return list(leaderboard.entries.all().order_by("rank")) - - # new entries - new_entries = generate_project_leaderboard(project, leaderboard) - # assign ranks - also applies exclusions bot_status = leaderboard.bot_status or project.bot_leaderboard_status bots_get_ranks = bot_status in [ @@ -746,8 +743,8 @@ def update_project_leaderboard( Project.BotLeaderboardStatus.INCLUDE, ] humans_get_ranks = bot_status != Project.BotLeaderboardStatus.BOTS_ONLY - new_entries = assign_ranks( - new_entries, + entries = assign_ranks( + entries, leaderboard, include_humans=humans_get_ranks, include_bots=bots_get_ranks, @@ -762,10 +759,10 @@ def update_project_leaderboard( minimum_prize_percent = ( float(leaderboard.minimum_prize_amount) / float(prize_pool) if prize_pool else 0 ) - new_entries = assign_prize_percentages(new_entries, minimum_prize_percent) + entries = assign_prize_percentages(entries, minimum_prize_percent) if prize_pool: # always assign prizes - new_entries = assign_prizes(new_entries, prize_pool) + entries = assign_prizes(entries, prize_pool) # check if we're ready to finalize and assign medals/prizes if applicable finalize_time = leaderboard.finalize_time or ( project.close_date if project else None @@ -781,7 +778,7 @@ def update_project_leaderboard( and project.default_permission == ObjectPermission.FORECASTER and project.visibility == Project.Visibility.NORMAL ): - new_entries = assign_medals(new_entries) + entries = assign_medals(entries) # always set finalize Leaderboard.objects.filter(pk=leaderboard.pk).update(finalized=True) @@ -791,17 +788,47 @@ def update_project_leaderboard( for entry in leaderboard.entries.all() } - for new_entry in new_entries: - new_entry.leaderboard = leaderboard - new_entry.id = previous_entries_map.get( - (new_entry.user_id, new_entry.aggregation_method) - ) + for entry in entries: + entry.leaderboard = leaderboard + entry.id = previous_entries_map.get((entry.user_id, entry.aggregation_method)) with transaction.atomic(): leaderboard.entries.all().delete() - LeaderboardEntry.objects.bulk_create(new_entries, batch_size=500) + LeaderboardEntry.objects.bulk_create(entries, batch_size=500) - return new_entries + return entries + + +def update_project_leaderboard( + project: Project | None = None, + leaderboard: Leaderboard | None = None, + force_update: bool = False, + force_finalize: bool = False, +) -> list[LeaderboardEntry]: + if project is None and leaderboard is None: + raise ValueError("Either project or leaderboard must be provided") + + leaderboard = leaderboard or project.primary_leaderboard + project = project or leaderboard.project + if not leaderboard: + raise ValueError("Leaderboard not found") + + if leaderboard.score_type == LeaderboardScoreTypes.MANUAL: + logger.info("%s is manual, not updating", leaderboard.name) + return list(leaderboard.entries.all().order_by("rank")) + + if not force_update and leaderboard.finalized: + logger.warning("%s is already finalized, not updating", str(leaderboard)) + return list(leaderboard.entries.all().order_by("rank")) + + # new entries + new_entries = generate_project_leaderboard(project, leaderboard) + + # process entries + processed_entries = process_entries_for_leaderboard( + new_entries, project, leaderboard, force_finalize=force_finalize + ) + return processed_entries def update_leaderboard_from_csv_data(