Skip to content

Commit 494ba29

Browse files
authored
Merge pull request #200 from Cybermaxi7/feat/revenue-sharing-stakeholder-persistence
feat: comprehensive revenue sharing enhancements
2 parents 61703f0 + 19c4470 commit 494ba29

15 files changed

+1000
-383
lines changed

.coverage

0 Bytes
Binary file not shown.

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,6 @@ qrcode==7.4.2
2727

2828
# Logging and Metrics
2929
prometheus-client==0.20.0
30+
slowapi==0.1.9
31+
cachetools==5.3.3
3032

src/calculation_history_store.py

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import json
2+
import logging
3+
import uuid
4+
from typing import Any, Dict, List, Optional
5+
from sqlalchemy import text
6+
from src.db import get_engine
7+
from src.revenue_sharing_models import RevenueCalculationResult, PayoutDistribution
8+
9+
logger = logging.getLogger("veritix.calculation_history_store")
10+
11+
def create_revenue_calculations_table() -> None:
12+
"""Create the revenue_calculations table if it does not yet exist."""
13+
engine = get_engine()
14+
if engine is None:
15+
logger.info("Skipping revenue_calculations table creation — no DB engine")
16+
return
17+
18+
with engine.connect() as conn:
19+
# Use FLOAT and TEXT for cross-DB compatibility (SQLite/Postgres)
20+
conn.execute(text("""
21+
CREATE TABLE IF NOT EXISTS revenue_calculations (
22+
id TEXT PRIMARY KEY,
23+
event_id TEXT NOT NULL,
24+
total_gross_sales FLOAT NOT NULL,
25+
total_fees FLOAT NOT NULL,
26+
net_revenue FLOAT NOT NULL,
27+
distributions TEXT NOT NULL,
28+
total_paid_out FLOAT NOT NULL,
29+
remaining_balance FLOAT NOT NULL,
30+
rules_applied TEXT NOT NULL,
31+
calculated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
32+
)
33+
"""))
34+
conn.commit()
35+
logger.info("revenue_calculations table ready")
36+
37+
def save_calculation(result: RevenueCalculationResult) -> str:
38+
"""Persist a revenue calculation result to the database."""
39+
engine = get_engine()
40+
if engine is None:
41+
return ""
42+
43+
calculation_id = str(uuid.uuid4())
44+
45+
query = text("""
46+
INSERT INTO revenue_calculations (
47+
id, event_id, total_gross_sales, total_fees, net_revenue,
48+
distributions, total_paid_out, remaining_balance, rules_applied
49+
) VALUES (
50+
:id, :event_id, :total_gross_sales, :total_fees, :net_revenue,
51+
:distributions, :total_paid_out, :remaining_balance, :rules_applied
52+
)
53+
""")
54+
55+
with engine.connect() as conn:
56+
conn.execute(query, {
57+
"id": calculation_id,
58+
"event_id": result.event_id,
59+
"total_gross_sales": result.total_gross_sales,
60+
"total_fees": result.total_fees,
61+
"net_revenue": result.net_revenue,
62+
"distributions": json.dumps([d.model_dump() for d in result.distributions]),
63+
"total_paid_out": result.total_paid_out,
64+
"remaining_balance": result.remaining_balance,
65+
"rules_applied": json.dumps(result.rules_applied)
66+
})
67+
conn.commit()
68+
69+
logger.info(f"Saved revenue calculation {calculation_id} for event {result.event_id}")
70+
return calculation_id
71+
72+
def get_history_for_event(event_id: str, page: int = 1, limit: int = 20) -> List[Dict[str, Any]]:
73+
"""Retrieve paginated calculation history for an event."""
74+
engine = get_engine()
75+
if engine is None:
76+
return []
77+
78+
offset = (page - 1) * limit
79+
80+
query = text("""
81+
SELECT id, event_id, total_gross_sales, total_fees, net_revenue,
82+
distributions, total_paid_out, remaining_balance, rules_applied, calculated_at
83+
FROM revenue_calculations
84+
WHERE event_id = :event_id
85+
ORDER BY calculated_at DESC
86+
LIMIT :limit OFFSET :offset
87+
""")
88+
89+
with engine.connect() as conn:
90+
result = conn.execute(query, {"event_id": event_id, "limit": limit, "offset": offset})
91+
history = []
92+
for row in result:
93+
history.append(_row_to_dict(row))
94+
return history
95+
96+
def get_calculation_by_id(calculation_id: str) -> Optional[Dict[str, Any]]:
97+
"""Retrieve a specific calculation by its ID."""
98+
engine = get_engine()
99+
if engine is None:
100+
return None
101+
102+
query = text("""
103+
SELECT id, event_id, total_gross_sales, total_fees, net_revenue,
104+
distributions, total_paid_out, remaining_balance, rules_applied, calculated_at
105+
FROM revenue_calculations
106+
WHERE id = :id
107+
""")
108+
109+
with engine.connect() as conn:
110+
result = conn.execute(query, {"id": calculation_id}).fetchone()
111+
if result:
112+
return _row_to_dict(result)
113+
return None
114+
115+
def _row_to_dict(row) -> Dict[str, Any]:
116+
"""Helper to convert a DB row to a dictionary with parsed JSON fields."""
117+
return {
118+
"id": row[0],
119+
"event_id": row[1],
120+
"total_gross_sales": row[2],
121+
"total_fees": row[3],
122+
"net_revenue": row[4],
123+
"distributions": json.loads(row[5]),
124+
"total_paid_out": row[6],
125+
"remaining_balance": row[7],
126+
"rules_applied": json.loads(row[8]),
127+
"calculated_at": row[9]
128+
}

src/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def validate_api_keys(cls, v: str, info) -> str:
4747
f"{info.field_name} must be at least 32 characters and must not use a default value"
4848
)
4949
return v
50+
SUPPORTED_CURRENCIES: str = "USD,NGN,GBP,EUR,KES"
5051
KNOWN_LOCATIONS: str = (
5152
"lagos,abuja,port harcourt,kano,ibadan,benin,kaduna,jos,enugu,calabar,"
5253
"owerri,warri,uyo,akure,ilorin,sokoto,zaria,maiduguri,asaba,nnewi"

src/currency_service.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import logging
2+
import requests
3+
from typing import Dict, List, Optional
4+
from cachetools import TTLCache, cached
5+
from fastapi import HTTPException, status
6+
from src.config import get_settings
7+
8+
logger = logging.getLogger("veritix.currency_service")
9+
10+
class ServiceUnavailableException(HTTPException):
11+
def __init__(self, detail: str = "Currency exchange service is currently unavailable"):
12+
super().__init__(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=detail)
13+
14+
# Cache rates for 10 minutes (600 seconds)
15+
rates_cache = TTLCache(maxsize=1, ttl=600)
16+
17+
@cached(cache=rates_cache)
18+
def get_exchange_rates() -> Dict[str, float]:
19+
"""Fetch latest exchange rates from external API relative to USD."""
20+
url = "https://api.exchangerate-api.com/v4/latest/USD"
21+
try:
22+
response = requests.get(url, timeout=10)
23+
response.raise_for_status()
24+
data = response.json()
25+
return data.get("rates", {})
26+
except Exception as exc:
27+
logger.error(f"Failed to fetch exchange rates: {exc}")
28+
raise ServiceUnavailableException()
29+
30+
def get_exchange_rate(from_currency: str, to_currency: str = "USD") -> float:
31+
"""
32+
Get the exchange rate between two currencies.
33+
34+
If from_currency is same as to_currency, returns 1.0.
35+
"""
36+
if from_currency == to_currency:
37+
return 1.0
38+
39+
supported = get_settings().SUPPORTED_CURRENCIES.split(",")
40+
if from_currency not in supported or to_currency not in supported:
41+
raise HTTPException(
42+
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
43+
detail=f"Currency not supported. Supported: {supported}"
44+
)
45+
46+
rates = get_exchange_rates()
47+
48+
if from_currency not in rates or to_currency not in rates:
49+
logger.error(f"Currency {from_currency} or {to_currency} not found in FX rates")
50+
raise ServiceUnavailableException(detail="Exchange rate for requested currency not available")
51+
52+
# Standardize to USD then to target
53+
# rate_to_usd = 1 / rates[from_currency] if from_currency != "USD" else 1.0
54+
# target_rate = rate_to_usd * rates[to_currency]
55+
56+
# Simpler since API is based on USD:
57+
# 1 USD = rates[NGN] NGN
58+
# 1 USD = rates[GBP] GBP
59+
# 1 NGN = (1 / rates[NGN]) USD
60+
# 1 NGN = (1 / rates[NGN]) * rates[GBP] GBP
61+
62+
usd_rate_for_from = rates[from_currency]
63+
usd_rate_for_to = rates[to_currency]
64+
65+
return usd_rate_for_to / usd_rate_for_from

src/main.py

Lines changed: 109 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from typing import Annotated, Any, Dict, List, Optional
1010

1111
import numpy as np
12-
from fastapi import Depends, FastAPI, HTTPException, Query, Request, WebSocket, WebSocketDisconnect
12+
from fastapi import BackgroundTasks, Depends, FastAPI, HTTPException, Query, Request, WebSocket, WebSocketDisconnect
1313
from fastapi.responses import FileResponse, JSONResponse, PlainTextResponse
1414
from fastapi.staticfiles import StaticFiles
1515
from slowapi.errors import RateLimitExceeded
@@ -57,8 +57,14 @@
5757
EventRevenueInput,
5858
RevenueCalculationResult,
5959
RevenueShareConfig,
60+
Stakeholder,
6061
)
6162
from src.revenue_sharing_service import revenue_sharing_service
63+
from src import (
64+
calculation_history_store,
65+
currency_service,
66+
stakeholder_store,
67+
)
6268
from src.recommender import (
6369
build_item_similarity_matrix,
6470
get_item_recommendations,
@@ -182,6 +188,8 @@ def on_startup() -> None:
182188
global model_pipeline, etl_scheduler
183189
settings = get_settings()
184190
create_generated_reports_table()
191+
stakeholder_store.create_stakeholders_table()
192+
calculation_history_store.create_revenue_calculations_table()
185193
if not settings.SKIP_MODEL_TRAINING:
186194
model_pipeline = train_logistic_regression_pipeline()
187195

@@ -586,7 +594,10 @@ def recommend_events(payload: RecommendRequest) -> RecommendResponse:
586594
# ---------------------------------------------------------------------------
587595

588596
@app.post("/calculate-revenue-share", response_model=RevenueCalculationResult)
589-
def calculate_revenue_share(input_data: EventRevenueInput) -> RevenueCalculationResult:
597+
def calculate_revenue_share(
598+
input_data: EventRevenueInput,
599+
background_tasks: BackgroundTasks,
600+
) -> RevenueCalculationResult:
590601
"""Calculate revenue shares for stakeholders based on event sales."""
591602
log_info("Revenue share calculation requested", {
592603
"event_id": input_data.event_id,
@@ -599,30 +610,42 @@ def calculate_revenue_share(input_data: EventRevenueInput) -> RevenueCalculation
599610
raise HTTPException(status_code=400, detail={"errors": errors})
600611
try:
601612
result = revenue_sharing_service.calculate_revenue_shares(input_data)
613+
background_tasks.add_task(calculation_history_store.save_calculation, result)
602614
return result
615+
except HTTPException:
616+
raise
603617
except Exception as exc:
604618
log_error("Revenue share calculation failed", {"error": str(exc)})
605619
raise HTTPException(status_code=500, detail=f"Revenue calculation failed: {exc}")
606620

607621

608622
@app.post("/calculate-revenue-share/batch", response_model=List[RevenueCalculationResult])
609623
def calculate_revenue_share_batch(
610-
inputs: List[EventRevenueInput],
624+
inputs: List[Dict[str, Any]],
625+
background_tasks: BackgroundTasks,
611626
) -> List[RevenueCalculationResult]:
612-
"""Calculate revenue shares for multiple events."""
627+
"""Calculate revenue shares for multiple events, skipping invalid ones."""
613628
log_info("Batch revenue share calculation requested", {"event_count": len(inputs)})
614629
results: List[RevenueCalculationResult] = []
615-
for input_data in inputs:
630+
for item in inputs:
616631
try:
632+
# Validate model first
633+
input_data = EventRevenueInput.model_validate(item)
634+
# Run service-level validation
617635
is_valid, errors = revenue_sharing_service.validate_input(input_data)
618636
if not is_valid:
637+
log_info("Skipping invalid event in batch", {"event_id": item.get("event_id"), "errors": errors})
619638
continue
620-
results.append(revenue_sharing_service.calculate_revenue_shares(input_data))
639+
640+
result = revenue_sharing_service.calculate_revenue_shares(input_data)
641+
background_tasks.add_task(calculation_history_store.save_calculation, result)
642+
results.append(result)
621643
except Exception as exc:
622-
log_error("Batch revenue calculation failed", {
623-
"event_id": input_data.event_id,
644+
log_error("Batch revenue calculation partially failed", {
645+
"event_id": item.get("event_id"),
624646
"error": str(exc),
625647
})
648+
continue
626649
log_info("Batch calculation completed", {
627650
"processed_count": len(results),
628651
"requested_count": len(inputs),
@@ -637,6 +660,17 @@ def get_revenue_share_config() -> RevenueShareConfig:
637660
return revenue_sharing_service.config
638661

639662

663+
@app.get("/revenue-share/exchange-rates")
664+
def get_exchange_rates() -> Dict[str, float]:
665+
"""Return the current currency exchange rates relative to USD."""
666+
log_info("Currency exchange rates requested")
667+
try:
668+
return currency_service.get_exchange_rates()
669+
except Exception as exc:
670+
log_error("Failed to fetch exchange rates", {"error": str(exc)})
671+
raise HTTPException(status_code=503, detail="Exchange rate service unavailable")
672+
673+
640674
@app.get("/revenue-share/example", response_model=EventRevenueInput)
641675
def get_example_revenue_input() -> EventRevenueInput:
642676
"""Return an example revenue calculation input."""
@@ -650,6 +684,73 @@ def get_example_revenue_input() -> EventRevenueInput:
650684
)
651685

652686

687+
@app.post("/revenue-share/stakeholders/{event_id}")
688+
def save_stakeholders(
689+
event_id: str,
690+
stakeholders: List[Stakeholder],
691+
_: str = Depends(require_admin_key),
692+
) -> Dict[str, Any]:
693+
"""Save custom stakeholder configuration for an event (ADMIN)."""
694+
log_info("Saving custom stakeholders", {"event_id": event_id, "count": len(stakeholders)})
695+
try:
696+
stakeholder_store.save_stakeholders_for_event(event_id, stakeholders)
697+
return {"success": True, "message": f"Saved {len(stakeholders)} stakeholders for event {event_id}"}
698+
except Exception as exc:
699+
log_error("Failed to save stakeholders", {"event_id": event_id, "error": str(exc)})
700+
raise HTTPException(status_code=500, detail=f"Failed to save stakeholders: {exc}")
701+
702+
703+
@app.get("/revenue-share/stakeholders/{event_id}", response_model=List[Stakeholder])
704+
def get_stakeholders(event_id: str) -> List[Stakeholder]:
705+
"""Return the stakeholder configuration for an event (custom or default)."""
706+
log_info("Retrieving stakeholders", {"event_id": event_id})
707+
try:
708+
stakeholders = stakeholder_store.get_stakeholders_for_event(event_id)
709+
if not stakeholders:
710+
# Fallback logic duplicated here for the GET endpoint
711+
stakeholders = revenue_sharing_service._get_default_stakeholders(event_id)
712+
return stakeholders
713+
except Exception as exc:
714+
log_error("Failed to retrieve stakeholders", {"event_id": event_id, "error": str(exc)})
715+
raise HTTPException(status_code=500, detail=f"Failed to retrieve stakeholders: {exc}")
716+
717+
718+
@app.get("/revenue-share/history/{event_id}")
719+
def get_revenue_history(
720+
event_id: str,
721+
page: int = Query(1, ge=1),
722+
limit: int = Query(20, ge=1, le=100),
723+
_: str = Depends(require_admin_key),
724+
) -> List[Dict[str, Any]]:
725+
"""Retrieve paginated calculation history for an event (ADMIN)."""
726+
log_info("Retrieving revenue history", {"event_id": event_id, "page": page, "limit": limit})
727+
try:
728+
return calculation_history_store.get_history_for_event(event_id, page, limit)
729+
except Exception as exc:
730+
log_error("Failed to retrieve history", {"event_id": event_id, "error": str(exc)})
731+
raise HTTPException(status_code=500, detail=f"Failed to retrieve history: {exc}")
732+
733+
734+
@app.get("/revenue-share/history/{event_id}/{calculation_id}")
735+
def get_calculation_detail(
736+
event_id: str,
737+
calculation_id: str,
738+
_: str = Depends(require_admin_key),
739+
) -> Dict[str, Any]:
740+
"""Retrieve a specific stored calculation by ID (ADMIN)."""
741+
log_info("Retrieving calculation detail", {"event_id": event_id, "calculation_id": calculation_id})
742+
try:
743+
detail = calculation_history_store.get_calculation_by_id(calculation_id)
744+
if not detail or detail["event_id"] != event_id:
745+
raise HTTPException(status_code=404, detail="Calculation not found for this event")
746+
return detail
747+
except HTTPException:
748+
raise
749+
except Exception as exc:
750+
log_error("Failed to retrieve calculation detail", {"calculation_id": calculation_id, "error": str(exc)})
751+
raise HTTPException(status_code=500, detail=f"Failed to retrieve detail: {exc}")
752+
753+
653754
# ---------------------------------------------------------------------------
654755
# Daily report
655756
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)