Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ class Settings(BaseSettings):

SERVICE_API_KEY: str = "default_service_secret_change_me"
ADMIN_API_KEY: str = "default_admin_secret_change_me"
KNOWN_LOCATIONS: str = (
"lagos,abuja,port harcourt,kano,ibadan,benin,kaduna,jos,enugu,calabar,"
"owerri,warri,uyo,akure,ilorin,sokoto,zaria,maiduguri,asaba,nnewi"
)

class Config:
env_file = ".env"
Expand Down
220 changes: 204 additions & 16 deletions src/etl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from datetime import date, datetime
from datetime import date, datetime, timezone
from typing import Any, Dict, List, Optional, Tuple

from sqlalchemy import (
Expand All @@ -11,6 +11,7 @@
String,
Table,
TIMESTAMP,
create_engine,
text,
)
from sqlalchemy.dialects.postgresql import insert as pg_insert
Expand All @@ -21,7 +22,7 @@
except Exception:
bigquery = None # Optional dependency

from src.logging_config import ETL_JOBS_TOTAL, log_error, log_info
from src.logging_config import ETL_JOBS_TOTAL, log_error, log_info, log_warning
from src.config import get_settings
import src.db as _db
from .extract import extract_events_and_sales
Expand All @@ -47,6 +48,152 @@ def _safe_float(x: Any) -> float:
return 0.0


# ---------------------------------------------------------------------------
# Validation (issue #162)
# ---------------------------------------------------------------------------

def validate_rows(
event_summary_rows: List[Dict[str, Any]],
daily_rows: List[Dict[str, Any]],
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], int]:
"""Reject malformed rows before they reach the database.

Rules:
- event_summary row: ``event_id`` must be non-empty.
- event_summary row: ``total_tickets`` and ``total_revenue`` must be >= 0.
- daily row: ``event_id`` must be non-empty.
- daily row: ``sale_date`` must not be more than 1 day in the future.

Every rejected row is logged as a warning so it remains traceable.

Returns:
(valid_event_rows, valid_daily_rows, rejected_count)
"""
rejected_count = 0
valid_event_rows: List[Dict[str, Any]] = []
today = datetime.now(tz=timezone.utc).date()

for row in event_summary_rows:
event_id = row.get("event_id")
if not event_id:
log_warning("ETL validate: rejected event row — empty event_id", {"row": row})
rejected_count += 1
continue
if _safe_int(row.get("total_tickets", 0)) < 0:
log_warning(
"ETL validate: rejected event row — negative total_tickets",
{"row": row},
)
rejected_count += 1
continue
if _safe_float(row.get("total_revenue", 0.0)) < 0:
log_warning(
"ETL validate: rejected event row — negative total_revenue",
{"row": row},
)
rejected_count += 1
continue
valid_event_rows.append(row)

valid_daily_rows: List[Dict[str, Any]] = []
for row in daily_rows:
event_id = row.get("event_id")
if not event_id:
log_warning("ETL validate: rejected daily row — empty event_id", {"row": row})
rejected_count += 1
continue
sale_date = row.get("sale_date")
if sale_date is not None:
if isinstance(sale_date, str):
try:
sale_date = datetime.fromisoformat(sale_date).date()
except Exception:
sale_date = None
if isinstance(sale_date, datetime):
sale_date = sale_date.date()
if isinstance(sale_date, date) and sale_date > today + __import__("datetime").timedelta(days=1):
log_warning(
"ETL validate: rejected daily row — sale_date is in the future",
{"row": row},
)
rejected_count += 1
continue
valid_daily_rows.append(row)

return valid_event_rows, valid_daily_rows, rejected_count


# ---------------------------------------------------------------------------
# ETL run log cursor helpers (issue #161)
# ---------------------------------------------------------------------------

_ETL_RUN_LOG_TABLE = "etl_run_log"


def _ensure_run_log_table(engine: Engine) -> Table:
"""Create etl_run_log table if it does not exist and return the Table object."""
metadata = MetaData()
run_log = Table(
_ETL_RUN_LOG_TABLE,
metadata,
Column("id", Integer, primary_key=True, autoincrement=True),
Column("started_at", TIMESTAMP(timezone=False)),
Column("finished_at", TIMESTAMP(timezone=False)),
Column("status", String), # 'success' | 'failed'
Column("last_run_at", TIMESTAMP(timezone=False)),
Column("rejected_count", Integer),
)
with engine.begin() as conn:
metadata.create_all(conn) # type: ignore[arg-type]
return run_log


def _load_last_successful_cursor(engine: Engine) -> Optional[str]:
"""Return the ISO-8601 ``finished_at`` of the last successful ETL run, or None."""
try:
with engine.connect() as conn:
row = conn.execute(
text(
f"SELECT finished_at FROM {_ETL_RUN_LOG_TABLE} " # noqa: S608
"WHERE status = 'success' "
"ORDER BY finished_at DESC "
"LIMIT 1"
)
).fetchone()
if row and row[0]:
ts = row[0]
if isinstance(ts, datetime):
return ts.isoformat()
return str(ts)
except Exception as exc:
logger.warning("Could not read ETL run log cursor: %s", exc)
return None


def _save_run_log(
engine: Engine,
run_log_table: Table,
started_at: datetime,
finished_at: datetime,
status: str,
rejected_count: int = 0,
) -> None:
"""Insert a row into etl_run_log."""
try:
with engine.begin() as conn:
conn.execute(
run_log_table.insert().values(
started_at=started_at,
finished_at=finished_at,
status=status,
last_run_at=finished_at if status == "success" else None,
rejected_count=rejected_count,
)
)
except Exception as exc:
logger.error("Failed to write ETL run log: %s", exc)


def transform_summary(
events: List[Dict[str, Any]],
sales: List[Dict[str, Any]],
Expand Down Expand Up @@ -381,19 +528,60 @@ def diff_etl_output(
# ---------------------------------------------------------------------------

def run_etl_once() -> None:
"""Run a single full ETL cycle: extract → transform → load."""
"""Run a single full ETL cycle: extract → transform → validate → load."""
started_at = datetime.utcnow()
log_info("ETL job started")
events, sales = extract_events_and_sales()
ev_rows, daily_rows = transform_summary(
[event.raw for event in events],
[sale.raw for sale in sales],
)
try:
load_postgres(ev_rows, daily_rows)
except Exception as exc:
log_error("Postgres load failed", {"error": str(exc)})

# --- Cursor: load last successful run timestamp (issue #161) ---
engine = _pg_engine()
run_log_table: Optional[Table] = None
since: Optional[str] = None
if engine is not None:
try:
run_log_table = _ensure_run_log_table(engine)
since = _load_last_successful_cursor(engine)
except Exception as exc:
log_error("Failed to initialise ETL run log", {"error": str(exc)})

log_info("ETL extract cursor", {"since": since})

rejected_count = 0
status = "failed"
try:
load_bigquery(ev_rows, daily_rows)
except Exception as exc:
log_error("BigQuery load failed", {"error": str(exc)})
log_info("ETL job completed")
events, sales = extract_events_and_sales(since=since)
ev_rows, daily_rows = transform_summary(
[event.raw for event in events],
[sale.raw for sale in sales],
)

# --- Validation step (issue #162) ---
ev_rows, daily_rows, rejected_count = validate_rows(ev_rows, daily_rows)
if rejected_count:
log_warning(
"ETL validation rejected rows",
{"rejected_count": rejected_count},
)

try:
load_postgres(ev_rows, daily_rows)
except Exception as exc:
log_error("Postgres load failed", {"error": str(exc)})
raise
try:
load_bigquery(ev_rows, daily_rows)
except Exception as exc:
log_error("BigQuery load failed", {"error": str(exc)})

status = "success"
finally:
finished_at = datetime.utcnow()
if engine is not None and run_log_table is not None:
_save_run_log(
engine,
run_log_table,
started_at=started_at,
finished_at=finished_at,
status=status,
rejected_count=rejected_count,
)
log_info("ETL job completed", {"status": status, "rejected_count": rejected_count})
23 changes: 19 additions & 4 deletions src/etl/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,22 +172,36 @@ def _to_ticket_sale_record(item: Dict[str, Any]) -> TicketSaleRecord:
)


def extract_events_and_sales() -> Tuple[List[EventRecord], List[TicketSaleRecord]]:
"""Fetch all events (paginated) and all ticket-sales from the upstream API."""
def extract_events_and_sales(
since: Optional[str] = None,
) -> Tuple[List[EventRecord], List[TicketSaleRecord]]:
"""Fetch events and ticket-sales from the upstream API.

Args:
since: Optional ISO-8601 datetime string. When provided, the
``?since=`` query parameter is forwarded to both the ``/events``
and ``/ticket-sales`` endpoints so that only records created after
that timestamp are returned (incremental extract). On the very
first run (no previous successful cursor) pass ``None`` to fetch
all records.
"""
settings = get_settings()
base_url = settings.NEST_API_BASE_URL.rstrip("/")
headers = _auth_headers()
events: List[EventRecord] = []

since_params: Dict[str, Any] = {"since": since} if since else {}

with httpx.Client(timeout=REQUEST_TIMEOUT_SECONDS) as client:
page = 1
while True:
params: Dict[str, Any] = {"page": page, **since_params}
response = _request_with_retry(
client=client,
url=f"{base_url}/events",
headers=headers,
dataset="events",
params={"page": page},
params=params,
)
payload: Any = response.json()
items = _normalize_items(payload)
Expand All @@ -202,13 +216,14 @@ def extract_events_and_sales() -> Tuple[List[EventRecord], List[TicketSaleRecord
url=f"{base_url}/ticket-sales",
headers=headers,
dataset="ticket-sales",
params=since_params if since_params else None,
)
sales_payload: Any = sales_response.json()
sales_items = _normalize_items(sales_payload)
sales = [_to_ticket_sale_record(item) for item in sales_items]

log_info(
"ETL extract completed",
{"events_count": len(events), "sales_count": len(sales)},
{"events_count": len(events), "sales_count": len(sales), "since": since},
)
return events, sales
28 changes: 16 additions & 12 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
RevenueShareConfig,
)
from src.revenue_sharing_service import revenue_sharing_service
from src.recommender import (
build_item_similarity_matrix,
get_item_recommendations,
get_user_events_from_db,
)
from src.search_utils import extract_keywords, filter_events_by_keywords
from src.types_custom import (
AnalyticsInvalidAttemptsResponse,
Expand Down Expand Up @@ -408,18 +413,17 @@ def search_events(payload: SearchEventsRequest) -> Any:
@app.post("/recommend-events", response_model=RecommendResponse)
def recommend_events(payload: RecommendRequest) -> RecommendResponse:
user_id = payload.user_id
if user_id not in mock_user_events:
raise HTTPException(status_code=404, detail={"message": "User not found"})
user_evts: set[str] = set(mock_user_events[user_id])
scores: Dict[str, int] = {}
for other_user, events in mock_user_events.items():
if other_user == user_id:
continue
overlap = len(user_evts.intersection(events))
for e in events:
if e not in user_evts:
scores[e] = scores.get(e, 0) + overlap
recommended = sorted(scores, key=lambda k: scores[k], reverse=True)[:3]
# Prefer DB-sourced history; fall back to mock data when DB is unavailable.
user_events_dict = get_user_events_from_db()
if not user_events_dict:
user_events_dict = mock_user_events
similarity_matrix = build_item_similarity_matrix(user_events_dict)
recommended = get_item_recommendations(
user_id=user_id,
user_events_dict=user_events_dict,
similarity_matrix=similarity_matrix,
top_n=3,
)
return RecommendResponse(recommendations=recommended)


Expand Down
Loading
Loading