Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 8 additions & 14 deletions src/analytics/models.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
"""Analytics models for tracking ticket scans, transfers, and invalid attempts."""
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Boolean, Text, Index
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text, Index
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime

from src.config import get_settings
import src.db as _db


Base = declarative_base()
Expand Down Expand Up @@ -96,24 +96,18 @@ class AnalyticsStats(Base):
)


def get_database_url():
"""Get database URL from centralized settings."""
return get_settings().DATABASE_URL


def get_engine():
"""Create database engine."""
return create_engine(get_database_url())
"""Return the shared database engine from src.db."""
return _db.get_engine()


def get_session():
"""Create database session."""
engine = get_engine()
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
return SessionLocal()
"""Return a database session from src.db."""
return _db.get_session()


def init_db():
"""Initialize the database tables."""
engine = get_engine()
Base.metadata.create_all(bind=engine)
if engine is not None:
Base.metadata.create_all(bind=engine)
87 changes: 85 additions & 2 deletions src/analytics/service.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Analytics service for tracking ticket scans, transfers, and invalid attempts."""
import json
import logging
import time
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple

from sqlalchemy import asc, desc, func
from sqlalchemy import asc, desc, func, text
from sqlalchemy.orm import Session

from src.analytics.models import (
Expand All @@ -14,8 +15,13 @@
TicketTransfer,
get_session,
)
import src.db as _db
from src.logging_config import log_error, log_info

# Simple in-memory cache: (result, expiry_timestamp)
_trending_cache: Optional[Tuple[List[Dict[str, Any]], float]] = None
_TRENDING_CACHE_TTL = 600 # 10 minutes


class AnalyticsService:
"""Service to handle analytics data storage and retrieval."""
Expand Down Expand Up @@ -355,6 +361,83 @@ def get_invalid_attempts(self, event_id: str, limit: int = 100) -> List[Dict[str
finally:
session.close()

def get_trending_events(self, limit: int = 10, hours: int = 24) -> List[Dict[str, Any]]:
"""Return top events by scan velocity over the last N hours.

Results are cached for 10 minutes to avoid repeated heavy queries.
Joins with event_sales_summary to include event names where available.
"""
global _trending_cache

# Return cached result if still valid
if _trending_cache is not None:
cached_result, expiry = _trending_cache
if time.monotonic() < expiry:
return cached_result[:limit]

engine = _db.get_engine()
if engine is None:
return []

cutoff = datetime.utcnow() - timedelta(hours=hours)
try:
with engine.connect() as conn:
# Attempt join with event_sales_summary for event names
try:
result = conn.execute(
text("""
SELECT ts.event_id,
COALESCE(ess.event_name, ts.event_id) AS event_name,
COUNT(*) AS scan_count
FROM ticket_scans ts
LEFT JOIN event_sales_summary ess
ON ts.event_id = ess.event_id
WHERE ts.scan_timestamp >= :cutoff
GROUP BY ts.event_id, ess.event_name
ORDER BY scan_count DESC
LIMIT :limit
"""),
{"cutoff": cutoff, "limit": limit},
)
rows = [
{
"event_id": row[0],
"event_name": row[1],
"scan_count": int(row[2]),
"window_hours": hours,
}
for row in result
]
except Exception:
# Fallback: query ticket_scans only (event_sales_summary may not exist)
result = conn.execute(
text("""
SELECT event_id, COUNT(*) AS scan_count
FROM ticket_scans
WHERE scan_timestamp >= :cutoff
GROUP BY event_id
ORDER BY scan_count DESC
LIMIT :limit
"""),
{"cutoff": cutoff, "limit": limit},
)
rows = [
{
"event_id": row[0],
"event_name": row[0],
"scan_count": int(row[1]),
"window_hours": hours,
}
for row in result
]
except Exception as exc:
log_error("Failed to get trending events", {"error": str(exc)})
return []

# Cache the full ordered result (up to a large limit for reuse)
_trending_cache = (rows, time.monotonic() + _TRENDING_CACHE_TTL)
return rows[:limit]

def _update_analytics_stats(self, event_id: str,
increment_scan: bool = False, is_valid: bool = True,
increment_transfer: bool = False, is_successful: bool = True,
Expand Down
3 changes: 3 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ class Settings(BaseSettings):
BQ_TABLE_DAILY_SALES: str = "daily_ticket_sales"


POOL_SIZE: int = 5
POOL_MAX_OVERFLOW: int = 10

SERVICE_API_KEY: str = "default_service_secret_change_me"
ADMIN_API_KEY: str = "default_admin_secret_change_me"

Expand Down
75 changes: 75 additions & 0 deletions src/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Centralised SQLAlchemy engine singleton with connection pooling.

All modules that need a database engine should import get_engine() and
get_session() from here rather than creating engines themselves.
"""
from __future__ import annotations

import logging
from typing import Any, Dict, Optional

from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session, sessionmaker

from src.config import get_settings

logger = logging.getLogger("veritix.db")

_engine: Optional[Engine] = None


def get_engine() -> Optional[Engine]:
"""Return the shared SQLAlchemy engine, creating it once on first call.

Returns None if DATABASE_URL is not configured.
"""
global _engine
if _engine is None:
settings = get_settings()
url = getattr(settings, "DATABASE_URL", None)
if not url:
logger.info("DATABASE_URL not set; skipping engine creation")
return None
try:
_engine = create_engine(
url,
pool_size=settings.POOL_SIZE,
max_overflow=settings.POOL_MAX_OVERFLOW,
pool_timeout=30,
pool_recycle=1800,
pool_pre_ping=True,
)
logger.info(
"Database engine created with pool_size=%d, max_overflow=%d",
settings.POOL_SIZE,
settings.POOL_MAX_OVERFLOW,
)
except Exception as exc:
logger.error("Failed to create database engine: %s", exc)
return None
return _engine


def get_session() -> Optional[Session]:
"""Create and return a new database session, or None if DB is not configured."""
engine = get_engine()
if engine is None:
return None
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
return SessionLocal()


def get_pool_status() -> Dict[str, Any]:
"""Return live connection pool statistics."""
engine = get_engine()
if engine is None:
return {"status": "unavailable", "reason": "DATABASE_URL not configured"}
pool = engine.pool
return {
"pool_size": pool.size(),
"checked_in": pool.checkedin(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
"invalid": pool.invalid(),
}
105 changes: 96 additions & 9 deletions src/etl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
String,
Table,
TIMESTAMP,
create_engine,
text,
)
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.engine import Engine
Expand All @@ -23,6 +23,7 @@

from src.logging_config import ETL_JOBS_TOTAL, log_error, log_info
from src.config import get_settings
import src.db as _db
from .extract import extract_events_and_sales

logger = logging.getLogger("veritix.etl")
Expand Down Expand Up @@ -110,14 +111,7 @@ def transform_summary(
# ---------------------------------------------------------------------------

def _pg_engine() -> Optional[Engine]:
url = get_settings().DATABASE_URL
if not url:
return None
try:
return create_engine(url, pool_pre_ping=True)
except Exception as exc:
logger.error("Failed to create PG engine: %s", exc)
return None
return _db.get_engine()


def load_postgres(
Expand Down Expand Up @@ -289,6 +283,99 @@ def _ensure_table(table_name: str, schema: List[Any]) -> str:
)


# ---------------------------------------------------------------------------
# ETL Diff (dry-run)
# ---------------------------------------------------------------------------

def diff_etl_output(
event_rows: List[Dict[str, Any]],
daily_rows: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""Compare transformed rows against the current PostgreSQL data without loading.

Returns a summary of what the next real ETL run would insert, update, or leave
unchanged for both the event_sales_summary and daily_ticket_sales tables.
"""
engine = _pg_engine()
if engine is None:
logger.info("DATABASE_URL not set; returning empty diff")
return {
"events": {"would_insert": 0, "would_update": 0, "unchanged": 0},
"daily": {"would_insert": 0, "would_update": 0, "unchanged": 0},
}

current_events: Dict[str, Dict[str, Any]] = {}
current_daily: Dict[tuple, Dict[str, Any]] = {}

with engine.connect() as conn:
try:
result = conn.execute(
text("SELECT event_id, total_tickets, total_revenue FROM event_sales_summary")
)
for row in result:
current_events[str(row[0])] = {
"total_tickets": int(row[1]) if row[1] is not None else 0,
"total_revenue": float(row[2]) if row[2] is not None else 0.0,
}
except Exception:
pass # table may not exist yet

try:
result = conn.execute(
text("SELECT event_id, sale_date, tickets_sold, revenue FROM daily_ticket_sales")
)
for row in result:
current_daily[(str(row[0]), str(row[1]))] = {
"tickets_sold": int(row[2]) if row[2] is not None else 0,
"revenue": float(row[3]) if row[3] is not None else 0.0,
}
except Exception:
pass # table may not exist yet

ev_insert = ev_update = ev_unchanged = 0
for row in event_rows:
eid = str(row.get("event_id", ""))
if eid not in current_events:
ev_insert += 1
else:
cur = current_events[eid]
if (
cur["total_tickets"] != int(row.get("total_tickets", 0))
or abs(cur["total_revenue"] - float(row.get("total_revenue", 0.0))) > 0.005
):
ev_update += 1
else:
ev_unchanged += 1

daily_insert = daily_update = daily_unchanged = 0
for row in daily_rows:
key = (str(row.get("event_id", "")), str(row.get("sale_date", "")))
if key not in current_daily:
daily_insert += 1
else:
cur = current_daily[key]
if (
cur["tickets_sold"] != int(row.get("tickets_sold", 0))
or abs(cur["revenue"] - float(row.get("revenue", 0.0))) > 0.005
):
daily_update += 1
else:
daily_unchanged += 1

return {
"events": {
"would_insert": ev_insert,
"would_update": ev_update,
"unchanged": ev_unchanged,
},
"daily": {
"would_insert": daily_insert,
"would_update": daily_update,
"unchanged": daily_unchanged,
},
}


# ---------------------------------------------------------------------------
# Orchestration
# ---------------------------------------------------------------------------
Expand Down
Loading
Loading