diff --git a/src/config.py b/src/config.py index dfc3651..8b9615e 100644 --- a/src/config.py +++ b/src/config.py @@ -40,6 +40,10 @@ class Settings(BaseSettings): SERVICE_API_KEY: str = "default_service_secret_change_me" ADMIN_API_KEY: str = "default_admin_secret_change_me" + KNOWN_LOCATIONS: str = ( + "lagos,abuja,port harcourt,kano,ibadan,benin,kaduna,jos,enugu,calabar," + "owerri,warri,uyo,akure,ilorin,sokoto,zaria,maiduguri,asaba,nnewi" + ) class Config: env_file = ".env" diff --git a/src/etl/__init__.py b/src/etl/__init__.py index 3c43824..4af9edb 100644 --- a/src/etl/__init__.py +++ b/src/etl/__init__.py @@ -1,5 +1,5 @@ import logging -from datetime import date, datetime +from datetime import date, datetime, timezone from typing import Any, Dict, List, Optional, Tuple from sqlalchemy import ( @@ -11,6 +11,7 @@ String, Table, TIMESTAMP, + create_engine, text, ) from sqlalchemy.dialects.postgresql import insert as pg_insert @@ -21,7 +22,7 @@ except Exception: bigquery = None # Optional dependency -from src.logging_config import ETL_JOBS_TOTAL, log_error, log_info +from src.logging_config import ETL_JOBS_TOTAL, log_error, log_info, log_warning from src.config import get_settings import src.db as _db from .extract import extract_events_and_sales @@ -47,6 +48,152 @@ def _safe_float(x: Any) -> float: return 0.0 +# --------------------------------------------------------------------------- +# Validation (issue #162) +# --------------------------------------------------------------------------- + +def validate_rows( + event_summary_rows: List[Dict[str, Any]], + daily_rows: List[Dict[str, Any]], +) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], int]: + """Reject malformed rows before they reach the database. + + Rules: + - event_summary row: ``event_id`` must be non-empty. + - event_summary row: ``total_tickets`` and ``total_revenue`` must be >= 0. + - daily row: ``event_id`` must be non-empty. + - daily row: ``sale_date`` must not be more than 1 day in the future. + + Every rejected row is logged as a warning so it remains traceable. + + Returns: + (valid_event_rows, valid_daily_rows, rejected_count) + """ + rejected_count = 0 + valid_event_rows: List[Dict[str, Any]] = [] + today = datetime.now(tz=timezone.utc).date() + + for row in event_summary_rows: + event_id = row.get("event_id") + if not event_id: + log_warning("ETL validate: rejected event row — empty event_id", {"row": row}) + rejected_count += 1 + continue + if _safe_int(row.get("total_tickets", 0)) < 0: + log_warning( + "ETL validate: rejected event row — negative total_tickets", + {"row": row}, + ) + rejected_count += 1 + continue + if _safe_float(row.get("total_revenue", 0.0)) < 0: + log_warning( + "ETL validate: rejected event row — negative total_revenue", + {"row": row}, + ) + rejected_count += 1 + continue + valid_event_rows.append(row) + + valid_daily_rows: List[Dict[str, Any]] = [] + for row in daily_rows: + event_id = row.get("event_id") + if not event_id: + log_warning("ETL validate: rejected daily row — empty event_id", {"row": row}) + rejected_count += 1 + continue + sale_date = row.get("sale_date") + if sale_date is not None: + if isinstance(sale_date, str): + try: + sale_date = datetime.fromisoformat(sale_date).date() + except Exception: + sale_date = None + if isinstance(sale_date, datetime): + sale_date = sale_date.date() + if isinstance(sale_date, date) and sale_date > today + __import__("datetime").timedelta(days=1): + log_warning( + "ETL validate: rejected daily row — sale_date is in the future", + {"row": row}, + ) + rejected_count += 1 + continue + valid_daily_rows.append(row) + + return valid_event_rows, valid_daily_rows, rejected_count + + +# --------------------------------------------------------------------------- +# ETL run log cursor helpers (issue #161) +# --------------------------------------------------------------------------- + +_ETL_RUN_LOG_TABLE = "etl_run_log" + + +def _ensure_run_log_table(engine: Engine) -> Table: + """Create etl_run_log table if it does not exist and return the Table object.""" + metadata = MetaData() + run_log = Table( + _ETL_RUN_LOG_TABLE, + metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("started_at", TIMESTAMP(timezone=False)), + Column("finished_at", TIMESTAMP(timezone=False)), + Column("status", String), # 'success' | 'failed' + Column("last_run_at", TIMESTAMP(timezone=False)), + Column("rejected_count", Integer), + ) + with engine.begin() as conn: + metadata.create_all(conn) # type: ignore[arg-type] + return run_log + + +def _load_last_successful_cursor(engine: Engine) -> Optional[str]: + """Return the ISO-8601 ``finished_at`` of the last successful ETL run, or None.""" + try: + with engine.connect() as conn: + row = conn.execute( + text( + f"SELECT finished_at FROM {_ETL_RUN_LOG_TABLE} " # noqa: S608 + "WHERE status = 'success' " + "ORDER BY finished_at DESC " + "LIMIT 1" + ) + ).fetchone() + if row and row[0]: + ts = row[0] + if isinstance(ts, datetime): + return ts.isoformat() + return str(ts) + except Exception as exc: + logger.warning("Could not read ETL run log cursor: %s", exc) + return None + + +def _save_run_log( + engine: Engine, + run_log_table: Table, + started_at: datetime, + finished_at: datetime, + status: str, + rejected_count: int = 0, +) -> None: + """Insert a row into etl_run_log.""" + try: + with engine.begin() as conn: + conn.execute( + run_log_table.insert().values( + started_at=started_at, + finished_at=finished_at, + status=status, + last_run_at=finished_at if status == "success" else None, + rejected_count=rejected_count, + ) + ) + except Exception as exc: + logger.error("Failed to write ETL run log: %s", exc) + + def transform_summary( events: List[Dict[str, Any]], sales: List[Dict[str, Any]], @@ -381,19 +528,60 @@ def diff_etl_output( # --------------------------------------------------------------------------- def run_etl_once() -> None: - """Run a single full ETL cycle: extract → transform → load.""" + """Run a single full ETL cycle: extract → transform → validate → load.""" + started_at = datetime.utcnow() log_info("ETL job started") - events, sales = extract_events_and_sales() - ev_rows, daily_rows = transform_summary( - [event.raw for event in events], - [sale.raw for sale in sales], - ) - try: - load_postgres(ev_rows, daily_rows) - except Exception as exc: - log_error("Postgres load failed", {"error": str(exc)}) + + # --- Cursor: load last successful run timestamp (issue #161) --- + engine = _pg_engine() + run_log_table: Optional[Table] = None + since: Optional[str] = None + if engine is not None: + try: + run_log_table = _ensure_run_log_table(engine) + since = _load_last_successful_cursor(engine) + except Exception as exc: + log_error("Failed to initialise ETL run log", {"error": str(exc)}) + + log_info("ETL extract cursor", {"since": since}) + + rejected_count = 0 + status = "failed" try: - load_bigquery(ev_rows, daily_rows) - except Exception as exc: - log_error("BigQuery load failed", {"error": str(exc)}) - log_info("ETL job completed") \ No newline at end of file + events, sales = extract_events_and_sales(since=since) + ev_rows, daily_rows = transform_summary( + [event.raw for event in events], + [sale.raw for sale in sales], + ) + + # --- Validation step (issue #162) --- + ev_rows, daily_rows, rejected_count = validate_rows(ev_rows, daily_rows) + if rejected_count: + log_warning( + "ETL validation rejected rows", + {"rejected_count": rejected_count}, + ) + + try: + load_postgres(ev_rows, daily_rows) + except Exception as exc: + log_error("Postgres load failed", {"error": str(exc)}) + raise + try: + load_bigquery(ev_rows, daily_rows) + except Exception as exc: + log_error("BigQuery load failed", {"error": str(exc)}) + + status = "success" + finally: + finished_at = datetime.utcnow() + if engine is not None and run_log_table is not None: + _save_run_log( + engine, + run_log_table, + started_at=started_at, + finished_at=finished_at, + status=status, + rejected_count=rejected_count, + ) + log_info("ETL job completed", {"status": status, "rejected_count": rejected_count}) \ No newline at end of file diff --git a/src/etl/extract.py b/src/etl/extract.py index f3e1a37..5f1bd84 100644 --- a/src/etl/extract.py +++ b/src/etl/extract.py @@ -172,22 +172,36 @@ def _to_ticket_sale_record(item: Dict[str, Any]) -> TicketSaleRecord: ) -def extract_events_and_sales() -> Tuple[List[EventRecord], List[TicketSaleRecord]]: - """Fetch all events (paginated) and all ticket-sales from the upstream API.""" +def extract_events_and_sales( + since: Optional[str] = None, +) -> Tuple[List[EventRecord], List[TicketSaleRecord]]: + """Fetch events and ticket-sales from the upstream API. + + Args: + since: Optional ISO-8601 datetime string. When provided, the + ``?since=`` query parameter is forwarded to both the ``/events`` + and ``/ticket-sales`` endpoints so that only records created after + that timestamp are returned (incremental extract). On the very + first run (no previous successful cursor) pass ``None`` to fetch + all records. + """ settings = get_settings() base_url = settings.NEST_API_BASE_URL.rstrip("/") headers = _auth_headers() events: List[EventRecord] = [] + since_params: Dict[str, Any] = {"since": since} if since else {} + with httpx.Client(timeout=REQUEST_TIMEOUT_SECONDS) as client: page = 1 while True: + params: Dict[str, Any] = {"page": page, **since_params} response = _request_with_retry( client=client, url=f"{base_url}/events", headers=headers, dataset="events", - params={"page": page}, + params=params, ) payload: Any = response.json() items = _normalize_items(payload) @@ -202,6 +216,7 @@ def extract_events_and_sales() -> Tuple[List[EventRecord], List[TicketSaleRecord url=f"{base_url}/ticket-sales", headers=headers, dataset="ticket-sales", + params=since_params if since_params else None, ) sales_payload: Any = sales_response.json() sales_items = _normalize_items(sales_payload) @@ -209,6 +224,6 @@ def extract_events_and_sales() -> Tuple[List[EventRecord], List[TicketSaleRecord log_info( "ETL extract completed", - {"events_count": len(events), "sales_count": len(sales)}, + {"events_count": len(events), "sales_count": len(sales), "since": since}, ) return events, sales \ No newline at end of file diff --git a/src/main.py b/src/main.py index 17d51a2..3904d00 100644 --- a/src/main.py +++ b/src/main.py @@ -52,6 +52,11 @@ RevenueShareConfig, ) from src.revenue_sharing_service import revenue_sharing_service +from src.recommender import ( + build_item_similarity_matrix, + get_item_recommendations, + get_user_events_from_db, +) from src.search_utils import extract_keywords, filter_events_by_keywords from src.types_custom import ( AnalyticsInvalidAttemptsResponse, @@ -408,18 +413,17 @@ def search_events(payload: SearchEventsRequest) -> Any: @app.post("/recommend-events", response_model=RecommendResponse) def recommend_events(payload: RecommendRequest) -> RecommendResponse: user_id = payload.user_id - if user_id not in mock_user_events: - raise HTTPException(status_code=404, detail={"message": "User not found"}) - user_evts: set[str] = set(mock_user_events[user_id]) - scores: Dict[str, int] = {} - for other_user, events in mock_user_events.items(): - if other_user == user_id: - continue - overlap = len(user_evts.intersection(events)) - for e in events: - if e not in user_evts: - scores[e] = scores.get(e, 0) + overlap - recommended = sorted(scores, key=lambda k: scores[k], reverse=True)[:3] + # Prefer DB-sourced history; fall back to mock data when DB is unavailable. + user_events_dict = get_user_events_from_db() + if not user_events_dict: + user_events_dict = mock_user_events + similarity_matrix = build_item_similarity_matrix(user_events_dict) + recommended = get_item_recommendations( + user_id=user_id, + user_events_dict=user_events_dict, + similarity_matrix=similarity_matrix, + top_n=3, + ) return RecommendResponse(recommendations=recommended) diff --git a/src/mock_events.py b/src/mock_events.py index b321a1f..7328175 100644 --- a/src/mock_events.py +++ b/src/mock_events.py @@ -37,6 +37,7 @@ def get_mock_events() -> List[Dict[str, Any]]: "description": "Experience the best of Afrobeats music with top Nigerian artists", "event_type": "music", "location": "Lagos", + "city": "Lagos", "date": this_saturday.isoformat(), "price": 5000.0, "capacity": 500, @@ -47,6 +48,7 @@ def get_mock_events() -> List[Dict[str, Any]]: "description": "Annual jazz festival featuring international and local artists", "event_type": "music", "location": "Lagos", + "city": "Lagos", "date": this_sunday.isoformat(), "price": 8000.0, "capacity": 1000, @@ -57,6 +59,7 @@ def get_mock_events() -> List[Dict[str, Any]]: "description": "Rock music concert with live bands", "event_type": "music", "location": "Lagos", + "city": "Lagos", "date": next_week.isoformat(), "price": 6000.0, "capacity": 800, @@ -68,6 +71,7 @@ def get_mock_events() -> List[Dict[str, Any]]: "description": "Multi-genre music festival in the capital", "event_type": "music", "location": "Abuja", + "city": "Abuja", "date": this_saturday.isoformat(), "price": 7000.0, "capacity": 1500, @@ -78,6 +82,7 @@ def get_mock_events() -> List[Dict[str, Any]]: "description": "Intimate jazz performance", "event_type": "music", "location": "Port Harcourt", + "city": "Port Harcourt", "date": next_month.isoformat(), "price": 4000.0, "capacity": 300, @@ -89,6 +94,7 @@ def get_mock_events() -> List[Dict[str, Any]]: "description": "Annual marathon event through Lagos city", "event_type": "sports", "location": "Lagos", + "city": "Lagos", "date": this_sunday.isoformat(), "price": 2000.0, "capacity": 5000, @@ -99,6 +105,7 @@ def get_mock_events() -> List[Dict[str, Any]]: "description": "Exciting football match between city rivals", "event_type": "sports", "location": "Lagos", + "city": "Lagos", "date": this_saturday.isoformat(), "price": 3000.0, "capacity": 2000, @@ -109,6 +116,7 @@ def get_mock_events() -> List[Dict[str, Any]]: "description": "Inter-state basketball championship", "event_type": "sports", "location": "Abuja", + "city": "Abuja", "date": next_week.isoformat(), "price": 1500.0, "capacity": 1000, @@ -120,6 +128,7 @@ def get_mock_events() -> List[Dict[str, Any]]: "description": "Leading technology conference in West Africa", "event_type": "tech", "location": "Lagos", + "city": "Lagos", "date": next_week.isoformat(), "price": 15000.0, "capacity": 500, @@ -130,6 +139,7 @@ def get_mock_events() -> List[Dict[str, Any]]: "description": "Startups pitching to investors", "event_type": "tech", "location": "Lagos", + "city": "Lagos", "date": this_saturday.isoformat(), "price": 5000.0, "capacity": 200, @@ -140,6 +150,7 @@ def get_mock_events() -> List[Dict[str, Any]]: "description": "Conference on artificial intelligence and ML", "event_type": "conference", "location": "Abuja", + "city": "Abuja", "date": next_month.isoformat(), "price": 20000.0, "capacity": 300, @@ -151,6 +162,7 @@ def get_mock_events() -> List[Dict[str, Any]]: "description": "Contemporary African art showcase", "event_type": "art", "location": "Lagos", + "city": "Lagos", "date": this_sunday.isoformat(), "price": 2500.0, "capacity": 150, @@ -161,6 +173,7 @@ def get_mock_events() -> List[Dict[str, Any]]: "description": "Traditional Nigerian dance performances", "event_type": "culture", "location": "Lagos", + "city": "Lagos", "date": next_week.isoformat(), "price": 3000.0, "capacity": 400, @@ -172,6 +185,7 @@ def get_mock_events() -> List[Dict[str, Any]]: "description": "Culinary experience with top chefs", "event_type": "food", "location": "Lagos", + "city": "Lagos", "date": this_saturday.isoformat(), "price": 4000.0, "capacity": 600, @@ -182,10 +196,34 @@ def get_mock_events() -> List[Dict[str, Any]]: "description": "Stand-up comedy with famous comedians", "event_type": "entertainment", "location": "Lagos", + "city": "Lagos", "date": this_sunday.isoformat(), "price": 3500.0, "capacity": 300, }, + # Events in extended cities (Owerri, Warri) + { + "id": "evt_016", + "name": "Owerri Cultural Night", + "description": "A night of cultural celebration in Owerri", + "event_type": "culture", + "location": "Owerri", + "city": "Owerri", + "date": next_week.isoformat(), + "price": 2000.0, + "capacity": 400, + }, + { + "id": "evt_017", + "name": "Warri Comedy Fiesta", + "description": "Stand-up comedy show in Warri", + "event_type": "entertainment", + "location": "Warri", + "city": "Warri", + "date": this_saturday.isoformat(), + "price": 3000.0, + "capacity": 350, + }, ] return mock_events \ No newline at end of file diff --git a/src/recommender.py b/src/recommender.py new file mode 100644 index 0000000..78a1617 --- /dev/null +++ b/src/recommender.py @@ -0,0 +1,127 @@ +"""Item-based collaborative filtering recommender. + +Implements cosine-similarity between event co-occurrence vectors using pure +Python (no numpy) as required by issue #159. +""" +import math +from typing import Dict, List, Optional + + +def build_item_similarity_matrix( + user_events_dict: Dict[str, List[str]], +) -> Dict[str, Dict[str, float]]: + """Build an item-item cosine similarity matrix from purchase history. + + Args: + user_events_dict: Mapping of user_id → list of purchased event IDs. + + Returns: + Nested dict similarity[event_a][event_b] = cosine similarity score. + """ + all_events: List[str] = sorted( + {event for events in user_events_dict.values() for event in events} + ) + + # Each event's co-occurrence vector: user_id → 1 if purchased, else absent. + event_vectors: Dict[str, Dict[str, int]] = { + event: { + user: 1 + for user, events in user_events_dict.items() + if event in events + } + for event in all_events + } + + def _cosine(v1: Dict[str, int], v2: Dict[str, int]) -> float: + dot = sum(v1.get(u, 0) * v2.get(u, 0) for u in set(v1) | set(v2)) + mag1 = math.sqrt(sum(x * x for x in v1.values())) + mag2 = math.sqrt(sum(x * x for x in v2.values())) + if mag1 == 0.0 or mag2 == 0.0: + return 0.0 + return dot / (mag1 * mag2) + + similarity: Dict[str, Dict[str, float]] = {e: {} for e in all_events} + for i, e1 in enumerate(all_events): + for e2 in all_events[i + 1 :]: + sim = _cosine(event_vectors[e1], event_vectors[e2]) + similarity[e1][e2] = sim + similarity[e2][e1] = sim + + return similarity + + +def get_item_recommendations( + user_id: str, + user_events_dict: Dict[str, List[str]], + similarity_matrix: Dict[str, Dict[str, float]], + top_n: int = 3, +) -> List[str]: + """Return top-N item-based recommendations for a user. + + Cold-start: if the user has no purchase history, returns the 3 most + popular events (highest total purchase count across all users). + + Args: + user_id: The user to generate recommendations for. + user_events_dict: Full purchase history mapping. + similarity_matrix: Pre-built item similarity matrix. + top_n: Number of recommendations to return. + + Returns: + List of recommended event IDs, ordered by descending score. + """ + user_events: List[str] = user_events_dict.get(user_id, []) + + if not user_events: + # Cold-start — return most popular events the user hasn't purchased. + event_counts: Dict[str, int] = {} + for events in user_events_dict.values(): + for e in events: + event_counts[e] = event_counts.get(e, 0) + 1 + return sorted(event_counts, key=lambda k: event_counts[k], reverse=True)[:top_n] + + # Aggregate similarity scores for unseen events. + user_set = set(user_events) + scores: Dict[str, float] = {} + for purchased in user_set: + for candidate, sim in similarity_matrix.get(purchased, {}).items(): + if candidate not in user_set: + scores[candidate] = scores.get(candidate, 0.0) + sim + + return sorted(scores, key=lambda k: scores[k], reverse=True)[:top_n] + + +def get_user_events_from_db(user_id: Optional[str] = None) -> Dict[str, List[str]]: + """Retrieve user purchase history from the database. + + Falls back to an empty dict when the DB is unavailable so the cold-start + path kicks in. Replace the stub body with a real SQLAlchemy query once + the user_event_purchases table exists. + + Args: + user_id: If provided, only fetch history for this user. + + Returns: + Mapping of user_id → list of purchased event IDs. + """ + try: + from src.config import get_settings + from sqlalchemy import create_engine, text + + engine = create_engine(get_settings().DATABASE_URL, pool_pre_ping=True) + with engine.connect() as conn: + if user_id: + rows = conn.execute( + text("SELECT user_id, event_id FROM user_event_purchases WHERE user_id = :uid"), + {"uid": user_id}, + ).fetchall() + else: + rows = conn.execute( + text("SELECT user_id, event_id FROM user_event_purchases") + ).fetchall() + result: Dict[str, List[str]] = {} + for row in rows: + result.setdefault(row[0], []).append(row[1]) + return result + except Exception: + return {} diff --git a/src/search_utils.py b/src/search_utils.py index c9300ff..5f9d4c4 100644 --- a/src/search_utils.py +++ b/src/search_utils.py @@ -3,6 +3,8 @@ from datetime import datetime, timedelta from typing import Any, Dict, List, Optional +from src.config import get_settings + def extract_keywords(query: str) -> Dict[str, Any]: """Extract keywords from a natural language search query. @@ -41,17 +43,9 @@ def extract_keywords(query: str) -> Dict[str, Any]: matched_keywords.add(kw) break + known_locations_raw = get_settings().KNOWN_LOCATIONS location_keywords: List[str] = [ - "lagos", - "abuja", - "port harcourt", - "kano", - "ibadan", - "benin", - "kaduna", - "jos", - "enugu", - "calabar", + loc.strip() for loc in known_locations_raw.split(",") if loc.strip() ] detected_locations: List[str] = [] @@ -59,6 +53,16 @@ def extract_keywords(query: str) -> Dict[str, Any]: if location in query_lower: detected_locations.append(location.title()) + # Detect potential unknown-city fuzzy candidates — words following a + # location preposition that are not already matched to a known location. + known_lower: set[str] = {loc.lower() for loc in location_keywords} + fuzzy_locations: List[str] = [] + prep_hits = re.findall(r"\b(?:in|at|near)\s+([a-z][a-z ]+?)(?=\s+(?:this|today|tomorrow|next|week|month|weekend)\b|$)", query_lower) + for hit in prep_hits: + hit = hit.strip() + if hit and hit not in known_lower: + fuzzy_locations.append(hit) + time_filter: Optional[str] = None if any(word in query_lower for word in ["today", "tonight"]): time_filter = "today" @@ -102,6 +106,7 @@ def extract_keywords(query: str) -> Dict[str, Any]: return { "event_types": detected_event_types, "locations": detected_locations, + "fuzzy_locations": fuzzy_locations, "time_filter": time_filter, "keywords": general_keywords, "min_price": nlp_min_price, @@ -142,6 +147,8 @@ def filter_events_by_keywords( # No filters — return everything if not any([keywords["event_types"], keywords["locations"], + keywords.get("fuzzy_locations"), keywords["time_filter"], + keywords["keywords"]]): keywords["time_filter"], keywords["keywords"], has_price_capacity_filter]): return events @@ -157,7 +164,20 @@ def filter_events_by_keywords( if keywords["locations"] and matches: event_location: str = str(event.get("location", "")).lower() - if not any(loc.lower() in event_location for loc in keywords["locations"]): + event_city: str = str(event.get("city", "")).lower() + if not any( + loc.lower() in event_location or loc.lower() in event_city + for loc in keywords["locations"] + ): + matches = False + elif keywords.get("fuzzy_locations") and matches: + # Fuzzy match: unknown city — try substring against location/city fields + event_location = str(event.get("location", "")).lower() + event_city = str(event.get("city", "")).lower() + if not any( + floc in event_location or floc in event_city + for floc in keywords["fuzzy_locations"] + ): matches = False if keywords["time_filter"] and matches: diff --git a/tests/test_etl_incremental.py b/tests/test_etl_incremental.py new file mode 100644 index 0000000..aeaf9d4 --- /dev/null +++ b/tests/test_etl_incremental.py @@ -0,0 +1,121 @@ +"""Tests for issue #161: incremental ETL extract using a cursor.""" +import httpx +import pytest + +from src.config import get_settings +from src.etl.extract import extract_events_and_sales + + +def _response(status_code: int, url: str, payload): + request = httpx.Request("GET", url) + return httpx.Response(status_code=status_code, json=payload, request=request) + + +class DummyClient: + def __init__(self, side_effects, calls): + self._side_effects = list(side_effects) + self.calls = calls + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def get(self, url, headers=None, params=None): + self.calls.append({"url": url, "headers": headers or {}, "params": params or {}}) + effect = self._side_effects.pop(0) + if isinstance(effect, Exception): + raise effect + return effect + + +def _set_env(monkeypatch): + monkeypatch.setenv("QR_SIGNING_KEY", "a" * 32) + monkeypatch.setenv("DATABASE_URL", "sqlite:///./incr-test.db") + monkeypatch.setenv("NEST_API_BASE_URL", "https://nest.example.test") + get_settings.cache_clear() + + +# --------------------------------------------------------------------------- +# First run — no cursor +# --------------------------------------------------------------------------- + +def test_first_run_no_since_param(monkeypatch): + """When since=None (first run), no ?since param is forwarded to the API.""" + _set_env(monkeypatch) + calls = [] + side_effects = [ + _response(200, "https://nest.example.test/events?page=1", {"data": [{"id": "E1", "name": "Ev"}], "pagination": {"page": 1, "total_pages": 1}}), + _response(200, "https://nest.example.test/ticket-sales", {"data": []}), + ] + monkeypatch.setattr("src.etl.extract.httpx.Client", lambda timeout: DummyClient(side_effects, calls)) + + extract_events_and_sales(since=None) + + assert "since" not in calls[0]["params"] + assert "since" not in calls[1]["params"] + + +# --------------------------------------------------------------------------- +# Subsequent run — cursor used +# --------------------------------------------------------------------------- + +def test_subsequent_run_since_param_forwarded(monkeypatch): + """When since is provided, ?since is forwarded to both /events and /ticket-sales.""" + _set_env(monkeypatch) + since_value = "2025-10-01T00:00:00" + calls = [] + side_effects = [ + _response(200, "https://nest.example.test/events", {"data": [{"id": "E2", "name": "Ev2"}], "pagination": {"page": 1, "total_pages": 1}}), + _response(200, "https://nest.example.test/ticket-sales", {"data": []}), + ] + monkeypatch.setattr("src.etl.extract.httpx.Client", lambda timeout: DummyClient(side_effects, calls)) + + extract_events_and_sales(since=since_value) + + assert calls[0]["params"].get("since") == since_value + assert calls[1]["params"].get("since") == since_value + + +def test_subsequent_run_returns_records_normally(monkeypatch): + """Incremental extract still parses and returns records correctly.""" + _set_env(monkeypatch) + calls = [] + side_effects = [ + _response(200, "https://nest.example.test/events", { + "data": [{"id": "E3", "name": "NewEvent"}], + "pagination": {"page": 1, "total_pages": 1}, + }), + _response(200, "https://nest.example.test/ticket-sales", { + "data": [{"event_id": "E3", "quantity": 5, "price": 20.0, "sale_date": "2025-11-01T00:00:00"}], + }), + ] + monkeypatch.setattr("src.etl.extract.httpx.Client", lambda timeout: DummyClient(side_effects, calls)) + + events, sales = extract_events_and_sales(since="2025-10-01T00:00:00") + + assert len(events) == 1 + assert events[0].event_id == "E3" + assert len(sales) == 1 + assert sales[0].event_id == "E3" + + +# --------------------------------------------------------------------------- +# Failed run — cursor not advanced +# --------------------------------------------------------------------------- + +def test_failed_run_cursor_not_advanced(monkeypatch): + """If extract raises, no cursor update happens (run_etl_once responsibility).""" + _set_env(monkeypatch) + calls = [] + side_effects = [ + _response(500, "https://nest.example.test/events", {}), + _response(500, "https://nest.example.test/events", {}), + _response(500, "https://nest.example.test/events", {}), + ] + monkeypatch.setattr("src.etl.extract.httpx.Client", lambda timeout: DummyClient(side_effects, calls)) + monkeypatch.setattr("src.etl.extract.time.sleep", lambda s: None) + + with pytest.raises(Exception): + extract_events_and_sales(since="2025-10-01T00:00:00") diff --git a/tests/test_etl_validation.py b/tests/test_etl_validation.py new file mode 100644 index 0000000..f42c5c6 --- /dev/null +++ b/tests/test_etl_validation.py @@ -0,0 +1,156 @@ +"""Tests for issue #162: ETL validate_rows() step.""" +from datetime import date, datetime, timedelta, timezone + +from src.etl import validate_rows + + +def _future_date(days_ahead: int = 2) -> date: + return datetime.now(tz=timezone.utc).date() + timedelta(days=days_ahead) + + +def _past_date(days_ago: int = 1) -> date: + return datetime.now(tz=timezone.utc).date() - timedelta(days=days_ago) + + +# --------------------------------------------------------------------------- +# Valid rows pass through unchanged +# --------------------------------------------------------------------------- + +def test_valid_rows_pass_through(): + ev_rows = [{"event_id": "E1", "total_tickets": 10, "total_revenue": 100.0}] + daily_rows = [{"event_id": "E1", "sale_date": _past_date()}] + + valid_ev, valid_daily, rejected = validate_rows(ev_rows, daily_rows) + + assert valid_ev == ev_rows + assert valid_daily == daily_rows + assert rejected == 0 + + +# --------------------------------------------------------------------------- +# event_summary rows — rejections +# --------------------------------------------------------------------------- + +def test_rejects_event_row_with_empty_event_id(): + ev_rows = [{"event_id": "", "total_tickets": 5, "total_revenue": 50.0}] + valid_ev, _, rejected = validate_rows(ev_rows, []) + assert valid_ev == [] + assert rejected == 1 + + +def test_rejects_event_row_with_none_event_id(): + ev_rows = [{"event_id": None, "total_tickets": 5, "total_revenue": 50.0}] + valid_ev, _, rejected = validate_rows(ev_rows, []) + assert valid_ev == [] + assert rejected == 1 + + +def test_rejects_event_row_with_negative_total_tickets(): + ev_rows = [{"event_id": "E2", "total_tickets": -1, "total_revenue": 50.0}] + valid_ev, _, rejected = validate_rows(ev_rows, []) + assert valid_ev == [] + assert rejected == 1 + + +def test_rejects_event_row_with_negative_total_revenue(): + ev_rows = [{"event_id": "E3", "total_tickets": 5, "total_revenue": -0.01}] + valid_ev, _, rejected = validate_rows(ev_rows, []) + assert valid_ev == [] + assert rejected == 1 + + +def test_accepts_event_row_with_zero_tickets_and_revenue(): + """Zero is valid — only negative values are rejected.""" + ev_rows = [{"event_id": "E4", "total_tickets": 0, "total_revenue": 0.0}] + valid_ev, _, rejected = validate_rows(ev_rows, []) + assert len(valid_ev) == 1 + assert rejected == 0 + + +# --------------------------------------------------------------------------- +# daily_rows — rejections +# --------------------------------------------------------------------------- + +def test_rejects_daily_row_with_empty_event_id(): + daily_rows = [{"event_id": "", "sale_date": _past_date()}] + _, valid_daily, rejected = validate_rows([], daily_rows) + assert valid_daily == [] + assert rejected == 1 + + +def test_rejects_daily_row_with_future_sale_date(): + daily_rows = [{"event_id": "E5", "sale_date": _future_date(days_ahead=2)}] + _, valid_daily, rejected = validate_rows([], daily_rows) + assert valid_daily == [] + assert rejected == 1 + + +def test_accepts_daily_row_with_today_sale_date(): + """A sale_date of today (0 days ahead) is within the 1-day tolerance.""" + today = datetime.now(tz=timezone.utc).date() + daily_rows = [{"event_id": "E6", "sale_date": today}] + _, valid_daily, rejected = validate_rows([], daily_rows) + assert len(valid_daily) == 1 + assert rejected == 0 + + +def test_accepts_daily_row_with_tomorrow_sale_date(): + """A sale_date of tomorrow (1 day ahead) is at the boundary — accepted.""" + tomorrow = datetime.now(tz=timezone.utc).date() + timedelta(days=1) + daily_rows = [{"event_id": "E7", "sale_date": tomorrow}] + _, valid_daily, rejected = validate_rows([], daily_rows) + assert len(valid_daily) == 1 + assert rejected == 0 + + +def test_accepts_daily_row_with_past_sale_date(): + daily_rows = [{"event_id": "E8", "sale_date": _past_date()}] + _, valid_daily, rejected = validate_rows([], daily_rows) + assert len(valid_daily) == 1 + assert rejected == 0 + + +def test_daily_row_sale_date_as_iso_string(): + """sale_date passed as ISO string is parsed and validated correctly.""" + past_iso = (_past_date()).isoformat() + daily_rows = [{"event_id": "E9", "sale_date": past_iso}] + _, valid_daily, rejected = validate_rows([], daily_rows) + assert len(valid_daily) == 1 + assert rejected == 0 + + +def test_daily_row_future_date_as_iso_string_rejected(): + future_iso = (_future_date(days_ahead=3)).isoformat() + daily_rows = [{"event_id": "E10", "sale_date": future_iso}] + _, valid_daily, rejected = validate_rows([], daily_rows) + assert valid_daily == [] + assert rejected == 1 + + +# --------------------------------------------------------------------------- +# Mixed batches +# --------------------------------------------------------------------------- + +def test_mixed_batch_correct_rejected_count(): + ev_rows = [ + {"event_id": "E1", "total_tickets": 10, "total_revenue": 100.0}, # valid + {"event_id": "", "total_tickets": 5, "total_revenue": 50.0}, # invalid + {"event_id": "E3", "total_tickets": -2, "total_revenue": 20.0}, # invalid + ] + daily_rows = [ + {"event_id": "E1", "sale_date": _past_date()}, # valid + {"event_id": "E1", "sale_date": _future_date(days_ahead=5)}, # invalid + ] + valid_ev, valid_daily, rejected = validate_rows(ev_rows, daily_rows) + + assert len(valid_ev) == 1 + assert valid_ev[0]["event_id"] == "E1" + assert len(valid_daily) == 1 + assert rejected == 3 + + +def test_empty_inputs_return_zero_rejected(): + valid_ev, valid_daily, rejected = validate_rows([], []) + assert valid_ev == [] + assert valid_daily == [] + assert rejected == 0 diff --git a/tests/test_recommend.py b/tests/test_recommend.py index 3125967..4b1f2ec 100644 --- a/tests/test_recommend.py +++ b/tests/test_recommend.py @@ -11,8 +11,11 @@ def test_recommend_events_valid_user(): assert isinstance(data["recommendations"], list) assert len(data["recommendations"]) <= 3 -def test_recommend_events_user_not_found(): - response = client.post("/recommend-events", json={"user_id": "unknown"}) - assert response.status_code == 404 - data = response.json() - assert data["detail"] == "User not found" \ No newline at end of file +def test_recommend_events_cold_start_unknown_user(): + """Unknown user gets cold-start recommendations (most popular events), not 404.""" + response = client.post("/recommend-events", json={"user_id": "unknown"}) + assert response.status_code == 200 + data = response.json() + assert "recommendations" in data + assert isinstance(data["recommendations"], list) + assert len(data["recommendations"]) <= 3 \ No newline at end of file diff --git a/tests/test_recommender.py b/tests/test_recommender.py new file mode 100644 index 0000000..054bdc9 --- /dev/null +++ b/tests/test_recommender.py @@ -0,0 +1,128 @@ +"""Unit tests for src/recommender.py (issue #159).""" +from src.recommender import build_item_similarity_matrix, get_item_recommendations + + +SAMPLE_HISTORY = { + "user1": ["concert_A", "concert_B"], + "user2": ["concert_B", "concert_C"], + "user3": ["concert_A", "concert_C", "concert_D"], + "user4": ["concert_D", "concert_E"], +} + + +# --------------------------------------------------------------------------- +# build_item_similarity_matrix +# --------------------------------------------------------------------------- + +def test_similarity_matrix_contains_all_event_pairs(): + matrix = build_item_similarity_matrix(SAMPLE_HISTORY) + all_events = {"concert_A", "concert_B", "concert_C", "concert_D", "concert_E"} + assert set(matrix.keys()) == all_events + for event, sims in matrix.items(): + others = all_events - {event} + assert set(sims.keys()) == others + + +def test_similarity_is_symmetric(): + matrix = build_item_similarity_matrix(SAMPLE_HISTORY) + assert abs(matrix["concert_A"]["concert_B"] - matrix["concert_B"]["concert_A"]) < 1e-9 + + +def test_similarity_range(): + """All similarity values must be in [0, 1].""" + matrix = build_item_similarity_matrix(SAMPLE_HISTORY) + for sims in matrix.values(): + for score in sims.values(): + assert 0.0 <= score <= 1.0 + 1e-9 + + +def test_identical_events_would_have_similarity_one(): + """Two events purchased by the same set of users are perfectly similar.""" + history = { + "alice": ["X", "Y"], + "bob": ["X", "Y"], + } + matrix = build_item_similarity_matrix(history) + assert abs(matrix["X"]["Y"] - 1.0) < 1e-9 + + +def test_disjoint_events_have_zero_similarity(): + """Events with no shared buyers have cosine similarity 0.""" + history = { + "alice": ["X"], + "bob": ["Y"], + } + matrix = build_item_similarity_matrix(history) + assert matrix["X"]["Y"] == 0.0 + + +# --------------------------------------------------------------------------- +# get_item_recommendations — normal path +# --------------------------------------------------------------------------- + +def test_recommendations_exclude_already_purchased(): + matrix = build_item_similarity_matrix(SAMPLE_HISTORY) + recs = get_item_recommendations("user1", SAMPLE_HISTORY, matrix, top_n=3) + user_events = set(SAMPLE_HISTORY["user1"]) + assert all(r not in user_events for r in recs) + + +def test_recommendations_count_at_most_top_n(): + matrix = build_item_similarity_matrix(SAMPLE_HISTORY) + recs = get_item_recommendations("user1", SAMPLE_HISTORY, matrix, top_n=3) + assert len(recs) <= 3 + + +def test_recommendations_top3_correct_for_user1(): + """user1 owns A+B; B co-occurs with C, A co-occurs with C+D → C should rank highly.""" + matrix = build_item_similarity_matrix(SAMPLE_HISTORY) + recs = get_item_recommendations("user1", SAMPLE_HISTORY, matrix, top_n=3) + # concert_C is purchased by both user2 (shares B with user1) and user3 (shares A) + assert "concert_C" in recs + + +# --------------------------------------------------------------------------- +# get_item_recommendations — cold-start path +# --------------------------------------------------------------------------- + +def test_cold_start_unknown_user_returns_popular_events(): + matrix = build_item_similarity_matrix(SAMPLE_HISTORY) + recs = get_item_recommendations("nobody", SAMPLE_HISTORY, matrix, top_n=3) + assert len(recs) <= 3 + # concert_D and concert_B appear in multiple users — should be in top results + assert len(recs) > 0 + + +def test_cold_start_empty_history_user_returns_popular(): + """User present in dict but with empty history falls back to popular.""" + history = dict(SAMPLE_HISTORY) + history["new_user"] = [] + matrix = build_item_similarity_matrix(history) + recs = get_item_recommendations("new_user", history, matrix, top_n=3) + assert isinstance(recs, list) + assert len(recs) <= 3 + + +def test_cold_start_popular_order(): + """Cold-start should return events by descending purchase frequency.""" + history = { + "u1": ["A", "B", "C"], + "u2": ["A", "B"], + "u3": ["A"], + } + matrix = build_item_similarity_matrix(history) + recs = get_item_recommendations("stranger", history, matrix, top_n=3) + # A appears 3 times, B 2 times, C 1 time + assert recs[0] == "A" + assert recs[1] == "B" + assert recs[2] == "C" + + +# --------------------------------------------------------------------------- +# Empty / edge cases +# --------------------------------------------------------------------------- + +def test_empty_history_returns_empty(): + matrix = build_item_similarity_matrix({}) + recs = get_item_recommendations("user1", {}, matrix, top_n=3) + assert recs == [] diff --git a/tests/test_search_location.py b/tests/test_search_location.py new file mode 100644 index 0000000..321b544 --- /dev/null +++ b/tests/test_search_location.py @@ -0,0 +1,135 @@ +"""Tests for issue #157: expanded location detection in extract_keywords.""" +import os +import pytest + +os.environ.setdefault("QR_SIGNING_KEY", "a" * 32) +os.environ.setdefault("DATABASE_URL", "sqlite:///./test-search-location.db") +os.environ.setdefault("NEST_API_BASE_URL", "https://nest.example.test") + + +from src.config import get_settings +from src.search_utils import extract_keywords, filter_events_by_keywords + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +SAMPLE_EVENTS = [ + {"id": "e1", "name": "Lagos Jazz", "description": "Jazz night", "event_type": "music", "location": "Lagos", "city": "Lagos", "date": "2099-01-01"}, + {"id": "e2", "name": "Owerri Cultural Night", "description": "Culture event", "event_type": "culture", "location": "Owerri", "city": "Owerri", "date": "2099-01-01"}, + {"id": "e3", "name": "Warri Comedy", "description": "Comedy show", "event_type": "entertainment", "location": "Warri", "city": "Warri", "date": "2099-01-01"}, + {"id": "e4", "name": "Abuja Summit", "description": "Tech summit", "event_type": "conference", "location": "Abuja", "city": "Abuja", "date": "2099-01-01"}, +] + + +# --------------------------------------------------------------------------- +# Test: known city exact match +# --------------------------------------------------------------------------- + +def test_known_city_exact_match_lagos(monkeypatch): + """Known city 'lagos' should appear in detected_locations.""" + monkeypatch.setenv("QR_SIGNING_KEY", "a" * 32) + monkeypatch.setenv("DATABASE_URL", "sqlite:///./test.db") + monkeypatch.setenv("NEST_API_BASE_URL", "https://nest.example.test") + get_settings.cache_clear() + + result = extract_keywords("music events in Lagos") + assert "Lagos" in result["locations"] + assert result["fuzzy_locations"] == [] + + +def test_known_city_exact_match_owerri(monkeypatch): + """Owerri is in the expanded default KNOWN_LOCATIONS.""" + monkeypatch.setenv("QR_SIGNING_KEY", "a" * 32) + monkeypatch.setenv("DATABASE_URL", "sqlite:///./test.db") + monkeypatch.setenv("NEST_API_BASE_URL", "https://nest.example.test") + get_settings.cache_clear() + + result = extract_keywords("events in Owerri this weekend") + assert "Owerri" in result["locations"] + assert result["fuzzy_locations"] == [] + + +def test_known_city_filter_returns_matching_events(monkeypatch): + """filter_events_by_keywords should return only Lagos events when Lagos is detected.""" + monkeypatch.setenv("QR_SIGNING_KEY", "a" * 32) + monkeypatch.setenv("DATABASE_URL", "sqlite:///./test.db") + monkeypatch.setenv("NEST_API_BASE_URL", "https://nest.example.test") + get_settings.cache_clear() + + kw = extract_keywords("events in Lagos") + filtered = filter_events_by_keywords(SAMPLE_EVENTS, kw) + assert all(e["city"] == "Lagos" for e in filtered) + assert len(filtered) == 1 + + +# --------------------------------------------------------------------------- +# Test: unknown city fuzzy match +# --------------------------------------------------------------------------- + +def test_unknown_city_fuzzy_match_detected(monkeypatch): + """A city not in KNOWN_LOCATIONS should land in fuzzy_locations.""" + monkeypatch.setenv("QR_SIGNING_KEY", "a" * 32) + monkeypatch.setenv("DATABASE_URL", "sqlite:///./test.db") + monkeypatch.setenv("NEST_API_BASE_URL", "https://nest.example.test") + monkeypatch.setenv("KNOWN_LOCATIONS", "lagos,abuja") # exclude warri + get_settings.cache_clear() + + result = extract_keywords("comedy events in Warri") + assert "warri" in result["fuzzy_locations"] + assert result["locations"] == [] + + +def test_unknown_city_fuzzy_filter_returns_matching_events(monkeypatch): + """filter_events_by_keywords should fuzzy-match Warri events when Warri is unknown.""" + monkeypatch.setenv("QR_SIGNING_KEY", "a" * 32) + monkeypatch.setenv("DATABASE_URL", "sqlite:///./test.db") + monkeypatch.setenv("NEST_API_BASE_URL", "https://nest.example.test") + monkeypatch.setenv("KNOWN_LOCATIONS", "lagos,abuja") # exclude warri + get_settings.cache_clear() + + kw = extract_keywords("events in Warri") + filtered = filter_events_by_keywords(SAMPLE_EVENTS, kw) + assert any(e["city"] == "Warri" for e in filtered) + assert all(e["city"] == "Warri" for e in filtered) + + +# --------------------------------------------------------------------------- +# Test: no location in query +# --------------------------------------------------------------------------- + +def test_no_location_in_query_returns_all(monkeypatch): + """When no location is specified, all events are returned.""" + monkeypatch.setenv("QR_SIGNING_KEY", "a" * 32) + monkeypatch.setenv("DATABASE_URL", "sqlite:///./test.db") + monkeypatch.setenv("NEST_API_BASE_URL", "https://nest.example.test") + get_settings.cache_clear() + + kw = extract_keywords("music events this weekend") + assert kw["locations"] == [] + assert kw["fuzzy_locations"] == [] + # All events pass the location filter (only event_type / time_filter may narrow results) + music_events = [e for e in SAMPLE_EVENTS if e["event_type"] == "music"] + filtered = filter_events_by_keywords(music_events, kw) + assert len(filtered) == len(music_events) + + +# --------------------------------------------------------------------------- +# Test: KNOWN_LOCATIONS env var is respected +# --------------------------------------------------------------------------- + +def test_known_locations_env_var_respected(monkeypatch): + """Setting KNOWN_LOCATIONS env var changes detected cities.""" + monkeypatch.setenv("QR_SIGNING_KEY", "a" * 32) + monkeypatch.setenv("DATABASE_URL", "sqlite:///./test.db") + monkeypatch.setenv("NEST_API_BASE_URL", "https://nest.example.test") + monkeypatch.setenv("KNOWN_LOCATIONS", "ibadan,jos") + get_settings.cache_clear() + + result = extract_keywords("events in Ibadan") + assert "Ibadan" in result["locations"] + + result2 = extract_keywords("events in Lagos") + assert "Lagos" not in result2["locations"] + assert "lagos" in result2["fuzzy_locations"]