Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
347 changes: 347 additions & 0 deletions script/transform_csv/convert.py

Large diffs are not rendered by default.

20,001 changes: 20,001 additions & 0 deletions script/transform_csv/data.csv

Large diffs are not rendered by default.

20,001 changes: 20,001 additions & 0 deletions script/transform_csv/measurement.csv

Large diffs are not rendered by default.

540,001 changes: 540,001 additions & 0 deletions script/transform_csv/observation.csv

Large diffs are not rendered by default.

20,001 changes: 20,001 additions & 0 deletions script/transform_csv/person.csv

Large diffs are not rendered by default.

92 changes: 92 additions & 0 deletions script/transform_csv/verification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#!/usr/bin/env python3
"""
verification.py
Quick sanity-checks for the four OMOP CSVs produced by convert.py.
• Ensures every person_id referenced in measurement.csv, observation.csv,
and visit_occurrence.csv exists in person.csv
• Verifies that the primary-key columns (person_id, measurement_id,
observation_id, visit_occurrence_id) are unique
• Prints a concise PASS / FAIL summary
Run *after* you have executed convert.py:
python3 verification.py
"""

import pandas as pd
from pathlib import Path

# ---------------------------------------------------------------------------

def load_csv(path: str, required: bool = True) -> pd.DataFrame | None:
if not Path(path).exists():
if required:
raise FileNotFoundError(f"{path} not found")
return None
return pd.read_csv(path)

# load files
person_df = load_csv("person.csv")
measurement_df = load_csv("measurement.csv")
observation_df = load_csv("observation.csv")
visit_df = load_csv("visit_occurrence.csv", required=False) # may be absent

# ---------------------------------------------------------------------------
# Helper for subset checks

def subset_ok(child_series: pd.Series, parent_set: set[int]) -> bool:
"""Return True if *all* IDs in child_series are contained in parent_set."""
return set(child_series.unique()).issubset(parent_set)

# Primary-key uniqueness checks
checks = []

def unique_ok(df: pd.DataFrame, col: str) -> bool:
"""Verify the column has unique non-null values."""
return df[col].is_unique and df[col].notna().all()

checks.append(("person_id uniqueness", unique_ok(person_df, "person_id")))
checks.append((
"measurement_id uniqueness",
unique_ok(measurement_df, "measurement_id"),
))
checks.append((
"observation_id uniqueness",
unique_ok(observation_df, "observation_id"),
))
if visit_df is not None:
checks.append((
"visit_occurrence_id uniqueness",
unique_ok(visit_df, "visit_occurrence_id"),
))

# FK-style subset checks
person_ids = set(person_df["person_id"].unique())

checks.append((
"measurement.person_id ⊆ person.person_id",
subset_ok(measurement_df["person_id"], person_ids),
))
checks.append((
"observation.person_id ⊆ person.person_id",
subset_ok(observation_df["person_id"], person_ids),
))
if visit_df is not None:
checks.append((
"visit_occurrence.person_id ⊆ person.person_id",
subset_ok(visit_df["person_id"], person_ids),
))

# ---------------------------------------------------------------------------
# Print summary

all_ok = True
print("\nValidation results:")
for name, result in checks:
status = "PASS" if result else "FAIL"
print(f" {status:<4}{name}")
all_ok &= result

print("\nOverall:", "✔︎ ALL CHECKS PASSED" if all_ok else "✘ SOME CHECKS FAILED")
Copy link

Copilot AI Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation script prints a PASS/FAIL summary but always exits with status 0. Consider using sys.exit(1) when all_ok is false so that CI jobs can detect failures automatically.

Suggested change
print("\nOverall:", "✔︎ ALL CHECKS PASSED" if all_ok else "✘ SOME CHECKS FAILED")
print("\nOverall:", "✔︎ ALL CHECKS PASSED" if all_ok else "✘ SOME CHECKS FAILED")
if not all_ok:
sys.exit(1)

Copilot uses AI. Check for mistakes.
20,001 changes: 20,001 additions & 0 deletions script/transform_csv/visit_occurrence.csv

Large diffs are not rendered by default.

59 changes: 57 additions & 2 deletions src/analytics/repository/analytics_repository.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,65 @@
from datetime import datetime, timezone

from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session

from src.analytics.model.analytics import Analytics


class AnalyticsRepository:
def __init__(self, session): # pragma: no cover
self.session = session
"""
Handles persistence for Analytics. add_or_update() performs a
PostgreSQL ON CONFLICT … DO UPDATE to satisfy unique constraint
uq_analytics_user_case without raising IntegrityError.
"""

def __init__(self, session: Session): # pragma: no cover
self.session: Session = session

def add_or_update(self, analytics: Analytics) -> Analytics: # pragma: no cover
"""
Insert OR update by (user_email, case_config_id).
Returns the resulting Analytics ORM object attached to the session.
"""
stmt = (
insert(Analytics)
.values(
user_email=analytics.user_email,
case_config_id=analytics.case_config_id,
case_id=analytics.case_id,
case_open_time=analytics.case_open_time,
answer_open_time=analytics.answer_open_time,
answer_submit_time=analytics.answer_submit_time,
to_answer_open_secs=analytics.to_answer_open_secs,
to_submit_secs=analytics.to_submit_secs,
total_duration_secs=analytics.total_duration_secs,
created_timestamp=analytics.created_timestamp,
modified_timestamp=analytics.modified_timestamp,
)
.on_conflict_do_update(
index_elements=["user_email", "case_config_id"],
set_=dict(
case_id=analytics.case_id,
case_open_time=analytics.case_open_time,
answer_open_time=analytics.answer_open_time,
answer_submit_time=analytics.answer_submit_time,
to_answer_open_secs=analytics.to_answer_open_secs,
to_submit_secs=analytics.to_submit_secs,
total_duration_secs=analytics.total_duration_secs,
modified_timestamp=datetime.now(timezone.utc),
),
)
.returning(Analytics)
)

result = self.session.execute(stmt)
self.session.flush() # keep session state in sync
return result.scalars().first()

# ------------------------------------------------------------------ #
# Legacy method retained for parts of the codebase that still expect
# the old "add" contract (will raise on duplicate key).
# ------------------------------------------------------------------ #
def add(self, analytics: Analytics) -> Analytics: # pragma: no cover
self.session.add(analytics)
self.session.flush()
Expand Down
24 changes: 19 additions & 5 deletions src/analytics/service/analytics_service.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from datetime import datetime, timezone

from src.analytics.model.analytics import Analytics
from src.analytics.repository.analytics_repository import AnalyticsRepository
from src.common.exception.BusinessException import (
Expand All @@ -10,6 +11,12 @@


class AnalyticsService:
"""
High-level service that records timing metrics for a user’s case review.
Uses an UPSERT, so repeated calls for the same (user_email, case_config_id)
update the existing Analytics row instead of throwing UniqueViolation.
"""

def __init__(
self,
analytics_repository: AnalyticsRepository,
Expand All @@ -18,24 +25,27 @@ def __init__(
self.analytics_repo = analytics_repository
self.config_repo = display_config_repository

# ------------------------------------------------------------------ #
def record_metrics(
self,
case_config_id: str,
case_open: datetime,
answer_open: datetime,
answer_submit: datetime,
) -> Analytics: # pragma: no cover
"""Validate ownership, compute durations, then upsert Analytics."""
user_email: str = get_user_email_from_jwt()

# verify user owns this case_config
# confirm caller owns this config
config = self.config_repo.get_configuration_by_id(case_config_id)
user_email = get_user_email_from_jwt()
if not config or config.user_email != user_email:
raise BusinessException(BusinessExceptionEnum.NoAccessToCaseReview)

# durations in seconds
# durations (sec)
to_answer_open = (answer_open - case_open).total_seconds()
to_submit = (answer_submit - answer_open).total_seconds()
total = (answer_submit - case_open).total_seconds()
now = datetime.now(timezone.utc)

analytics = Analytics(
user_email=user_email,
Expand All @@ -47,5 +57,9 @@ def record_metrics(
to_answer_open_secs=to_answer_open,
to_submit_secs=to_submit,
total_duration_secs=total,
created_timestamp=now,
modified_timestamp=now,
)
return self.analytics_repo.add(analytics)

# use UPSERT instead of plain add()
return self.analytics_repo.add_or_update(analytics)
18 changes: 15 additions & 3 deletions src/answer/repository/answer_repository.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
from typing import List

from sqlalchemy import select
from sqlalchemy.orm import Session

from src.answer.model.answer import Answer


class AnswerRepository:
def __init__(self, session):
"""
Persists Answer rows.

Key fix: add_answer() commits the session so the row survives the request.
"""

def __init__(self, session: Session): # pragma: no cover
self.session = session

def add_answer(self, answer: Answer):
# ------------------------------------------------------------------ #
def add_answer(self, answer: Answer) -> Answer: # pragma: no cover
"""
Insert + commit. Returns the persisted Answer instance.
"""
self.session.add(answer)
self.session.flush()
self.session.commit()
return answer

# ------------------------------------------------------------------ #
def get_answered_case_list_by_user(self, user_email: str) -> List[str]:
statement = (
select(Answer.task_id)
Expand Down
6 changes: 3 additions & 3 deletions src/answer/service/answer_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(
self.answer_config_repository = answer_config_repository

def add_answer_response(self, task_id: int, data: dict):
user_eamil = auth_utils.get_user_email_from_jwt()
user_email = auth_utils.get_user_email_from_jwt()

answer = data["answer"]
answer_config_id = data["answerConfigId"]
Expand All @@ -32,7 +32,7 @@ def add_answer_response(self, task_id: int, data: dict):

configuration = self.configuration_repository.get_configuration_by_id(task_id)

if not configuration or configuration.user_email != user_eamil:
if not configuration or configuration.user_email != user_email:
raise BusinessException(BusinessExceptionEnum.NoAccessToCaseReview)

answer_config = self.answer_config_repository.get_answer_config(
Expand All @@ -44,7 +44,7 @@ def add_answer_response(self, task_id: int, data: dict):
diagnose = Answer(
task_id=task_id,
case_id=configuration.case_id,
user_email=user_eamil,
user_email=user_email,
ai_score_shown=ai_shown,
display_configuration=configuration.path_config,
answer_config_id=answer_config.id,
Expand Down
2 changes: 2 additions & 0 deletions src/user/service/configuration_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ def process_csv_file(self, file_stream: StringIO) -> list[dict[str, str]]:
result["status"] = "failed"
responses.append(result)

self.repository.session.commit()
Copy link

Copilot AI Jul 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Committing directly in the service layer couples transaction management with business logic. Consider encapsulating commit behavior inside the repository or using a dedicated transaction manager to centralize persistence concerns.

Suggested change
self.repository.session.commit()
self.repository.commit_transaction()

Copilot uses AI. Check for mistakes.

return responses
Loading