Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions misc/management/commands/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from scoring.jobs import (
finalize_leaderboards,
update_global_comment_and_question_leaderboards,
update_custom_leaderboards,
)
from scoring.utils import update_medal_points_and_ranks

Expand Down Expand Up @@ -200,6 +201,13 @@ def handle(self, *args, **options):
max_instances=1,
replace_existing=True,
)
scheduler.add_job(
close_old_connections(update_custom_leaderboards),
trigger=CronTrigger.from_crontab("0 5 * * *"), # Every day at 05:00 UTC
id="update_custom_leaderboards",
max_instances=1,
replace_existing=True,
)

#
# Comment Jobs
Expand Down
40 changes: 40 additions & 0 deletions scoring/jobs.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import logging

from django.utils import timezone
from datetime import datetime, timezone as dt_timezone

from projects.models import Project
from scoring.constants import LeaderboardScoreTypes
from scoring.models import Leaderboard
from scoring.utils import update_project_leaderboard
from scoring.tasks import update_custom_leaderboard

logger = logging.getLogger(__name__)

Expand All @@ -30,3 +33,40 @@ def finalize_leaderboards():
if finalize_time and finalize_time <= timezone.now():
logger.info(f"Finalizing leaderboard: {leaderboard}")
update_project_leaderboard(leaderboard=leaderboard)


def update_custom_leaderboards():
"""
Trigger the custom leaderboard updates.

Leaderboards to update are hardcoded here.
If adding more, be sure failures are handled gracefully.
"""

# US Democracy Threat Index
project = Project.objects.filter(
slug="us-democracy-threat",
type=Project.ProjectTypes.INDEX,
).first()
if project:
try:
update_custom_leaderboard(
project_id=project.id,
minimum_time=datetime(2025, 12, 12, tzinfo=dt_timezone.utc),
spot_times=None,
)
# TODO: add spot times as they become determined
# update_custom_leaderboard(
# project_id=project.id,
# minimum_time=None,
# spot_times=[datetime(2026, 1, 1, tzinfo=dt_timezone.utc)],
# )
except Exception as e:
logger.error(
f"Error updating custom leaderboard for project "
f"'{project.name}': {e}"
)
else:
# don't warn or error because this project doesn't necessarily exist
# in all environments
logger.info("Index 'us-democracy-threat' not found.")
170 changes: 170 additions & 0 deletions scoring/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import logging
from datetime import datetime
from collections import defaultdict

from django.db.models import QuerySet
import dramatiq

from posts.models import Post
from scoring.constants import LeaderboardScoreTypes, ScoreTypes
from scoring.models import Leaderboard, Score
from scoring.score_math import evaluate_question
from projects.models import Project
from questions.models import Question
from questions.constants import UnsuccessfulResolutionType
from scoring.utils import generate_entries_from_scores, process_entries_for_leaderboard

logger = logging.getLogger(__name__)


def calculate_minimum_time_scores(
questions: QuerySet[Question],
minimum_time: datetime,
score_type: ScoreTypes = ScoreTypes.PEER,
) -> list[Score]:

scores: list[Score] = []

c = questions.count()
i = 0
for question in questions:
i += 1
logger.info(f"Processing question {i}/{c} (ID: {question.id})")
if question.open_time >= minimum_time:
scores.extend(question.scores.filter(score_type=score_type))
continue
question.open_time = minimum_time
# simulate scores as if question open_time was minimum_time
new_scores = evaluate_question(
question=question,
resolution=question.resolution,
score_types=[score_type],
)
scores.extend(new_scores)

return scores


def calculate_spot_times_scores(
questions: QuerySet[Question],
spot_times: list[datetime],
score_type: ScoreTypes = ScoreTypes.SPOT_PEER,
) -> list[Score]:

scores: list[Score] = []

c = questions.count()
i = 0
for question in questions:
i += 1
logger.info(f"Processing question {i}/{c} (ID: {question.id})")
question_scores: list[Score] = []
for spot_time in spot_times:
# simulate scores as if question spot_scoring_time was spot_time
new_scores = evaluate_question(
question=question,
resolution=question.resolution,
score_types=[score_type],
spot_forecast_time=spot_time,
)
question_scores.extend(new_scores)
user_score_map = defaultdict(list)
for score in question_scores:
user_score_map[(score.user_id, score.aggregation_method)].append(
score.score
)
for (user_id, aggregation_method), user_scores in user_score_map.items():
scores.append(
Score(
user_id=user_id,
aggregation_method=aggregation_method,
score=sum(user_scores) / len(spot_times),
score_type=score_type,
question=question,
coverage=len(user_scores) / len(spot_times),
)
)

return scores


@dramatiq.actor
def update_custom_leaderboard(
project_id: int,
minimum_time: datetime | None = None,
spot_times: list[datetime] | None = None,
score_type: ScoreTypes = ScoreTypes.PEER,
) -> None:
project = Project.objects.filter(id=project_id).first()
if not project:
logger.error(f"Project with id {project_id} does not exist.")
return
if bool(minimum_time) == bool(spot_times):
logger.error("minimum_time or spot_times must be provided, but not both.")
return

# setup
name = (
f"Set open_time for {project.name} at {minimum_time}"
if minimum_time
else (f"Spot time for {project.name} at {len(spot_times)} spot times")
)
leaderboard, _ = Leaderboard.objects.get_or_create(
prize_pool=0,
name=name,
project=project,
score_type=LeaderboardScoreTypes.MANUAL,
)
questions = (
leaderboard.get_questions()
.filter(
related_posts__post__curation_status=Post.CurationStatus.APPROVED,
resolution__isnull=False,
)
.exclude(resolution__in=UnsuccessfulResolutionType)
)
if not questions.exists():
logger.info(f"No resolved questions found for project {project.name}.")
return
# detect if any questions actually resolved since last evaluation
existing_entries = leaderboard.entries.all()
if existing_entries.exists():
last_evaluation_time = max(
entry.calculated_on for entry in existing_entries if entry.calculated_on
)
newly_resolved_questions = questions.filter(
resolution_set_time__gt=last_evaluation_time
)
if not newly_resolved_questions.exists():
logger.info(
"No questions resolved since last evaluation "
f"at {last_evaluation_time}, skipping leaderboard update."
)
return

if minimum_time:
scores = calculate_minimum_time_scores(questions, minimum_time, score_type)

if spot_times:
if score_type == ScoreTypes.PEER:
score_type = ScoreTypes.SPOT_PEER
if score_type == ScoreTypes.BASELINE:
score_type = ScoreTypes.SPOT_BASELINE
scores = calculate_spot_times_scores(questions, spot_times, score_type)

# temporarily change leaderboard type for entry creation
if score_type in [ScoreTypes.PEER, ScoreTypes.SPOT_PEER]:
leaderboard.score_type = LeaderboardScoreTypes.PEER_TOURNAMENT
elif score_type in [ScoreTypes.BASELINE, ScoreTypes.SPOT_BASELINE]:
leaderboard.score_type = LeaderboardScoreTypes.SPOT_BASELINE_TOURNAMENT
else:
leaderboard.score_type = score_type
new_entries = generate_entries_from_scores(scores, questions, leaderboard)
leaderboard.score_type = LeaderboardScoreTypes.MANUAL

process_entries_for_leaderboard(
new_entries, project, leaderboard, force_finalize=False
)

logger.info(f"Updated leaderboard: {leaderboard.name} with id {leaderboard.id}")
return
Loading