Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions backend/app/api/escrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

from __future__ import annotations

import logging

from fastapi import APIRouter, HTTPException, status

from app.exceptions import (
Expand All @@ -34,6 +36,9 @@
refund_escrow,
release_escrow,
)
from app.services.onchain_cache import cache_get, cache_invalidate, cache_set

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/escrow", tags=["escrow"])

Expand Down Expand Up @@ -67,6 +72,7 @@ async def fund_escrow(body: EscrowFundRequest) -> EscrowResponse:
)
# Auto-activate after successful funding
escrow = await activate_escrow(body.bounty_id)
await cache_invalidate("escrow", body.bounty_id)
return escrow
except EscrowAlreadyExistsError as exc:
raise HTTPException(status_code=409, detail=str(exc)) from exc
Expand Down Expand Up @@ -96,10 +102,12 @@ async def release_escrow_endpoint(body: EscrowReleaseRequest) -> EscrowResponse:
moves the escrow to COMPLETED state.
"""
try:
return await release_escrow(
result = await release_escrow(
bounty_id=body.bounty_id,
winner_wallet=body.winner_wallet,
)
await cache_invalidate("escrow", body.bounty_id)
return result
except EscrowNotFoundError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except InvalidEscrowTransitionError as exc:
Expand All @@ -123,7 +131,9 @@ async def release_escrow_endpoint(body: EscrowReleaseRequest) -> EscrowResponse:
async def refund_escrow_endpoint(body: EscrowRefundRequest) -> EscrowResponse:
"""Return escrowed $FNDRY to the bounty creator on timeout or cancellation."""
try:
return await refund_escrow(bounty_id=body.bounty_id)
result = await refund_escrow(bounty_id=body.bounty_id)
await cache_invalidate("escrow", body.bounty_id)
return result
except EscrowNotFoundError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc
except InvalidEscrowTransitionError as exc:
Expand All @@ -141,8 +151,18 @@ async def refund_escrow_endpoint(body: EscrowRefundRequest) -> EscrowResponse:
},
)
async def get_escrow(bounty_id: str) -> EscrowStatusResponse:
"""Return the current escrow state, locked balance, and full audit trail."""
"""Return the current escrow state, locked balance, and full audit trail.

Results are cached in Redis for 30 seconds to reduce database load.
"""
cached = await cache_get("escrow", bounty_id)
if cached is not None:
return EscrowStatusResponse.model_validate(cached)

try:
return await get_escrow_status(bounty_id=bounty_id)
result = await get_escrow_status(bounty_id=bounty_id)
except EscrowNotFoundError as exc:
raise HTTPException(status_code=404, detail=str(exc)) from exc

await cache_set("escrow", bounty_id, result.model_dump(mode="json"))
return result
260 changes: 260 additions & 0 deletions backend/app/api/onchain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
"""On-chain data REST API endpoints with Redis caching.

Provides read-only endpoints that aggregate on-chain Solana state with a
30-second Redis TTL cache to limit RPC calls.

Endpoints:
- ``GET /reputation/{wallet}`` -- Reputation summary for a wallet address.
- ``GET /staking/{wallet}`` -- Staking info (SOL + FNDRY balance) for wallet.
- ``GET /treasury/stats`` -- Treasury SOL/FNDRY balance and aggregate stats.
- ``POST /webhooks/helius`` -- Cache invalidation webhook for Helius/Shyft.
"""

from __future__ import annotations

import hashlib
import hmac
import logging
import os
from typing import Annotated, Any

from fastapi import APIRouter, Header, HTTPException, Query, status
from pydantic import BaseModel, Field

from app.models.payout import TreasuryStats
from app.models.reputation import ReputationSummary
from app.services import reputation_service
from app.services.onchain_cache import (
cache_get,
cache_invalidate,
cache_invalidate_prefix,
cache_set,
)
from app.services.solana_client import (
SolanaRPCError,
get_sol_balance,
get_token_balance,
)
from app.services.treasury_service import get_treasury_stats

logger = logging.getLogger(__name__)

router = APIRouter(tags=["onchain"])

_HELIUS_WEBHOOK_SECRET = os.getenv("HELIUS_WEBHOOK_SECRET", "")


# ---------------------------------------------------------------------------
# Response models
# ---------------------------------------------------------------------------


class StakingInfo(BaseModel):
"""On-chain balances for a given wallet address."""

wallet: str
sol_balance: float = Field(..., description="Native SOL balance")
fndry_balance: float = Field(..., description="$FNDRY SPL token balance")
cached: bool = Field(False, description="True when served from cache")


class HeliusWebhookPayload(BaseModel):
"""Minimal Helius / Shyft webhook payload."""

type: str = Field("", description="Transaction type or event name")
accounts: list[str] = Field(default_factory=list)


class CacheInvalidationResponse(BaseModel):
keys_removed: int


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------


async def _get_reputation_by_wallet(wallet: str) -> ReputationSummary | None:
"""Look up reputation for the contributor whose wallet matches *wallet*.

Returns ``None`` when no contributor has verified the given wallet.
"""
from app.database import async_session_factory
from app.models.user import User
from sqlalchemy import select

try:
async with async_session_factory() as session:
result = await session.execute(
select(User).where(
User.wallet_address == wallet,
User.wallet_verified.is_(True),
)
)
user = result.scalars().first()
if user is None:
return None
# contributor_id == username for the reputation store
return await reputation_service.get_reputation(user.username)
except Exception as exc:
logger.warning("wallet→reputation lookup failed: %s", exc)
return None


# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------


@router.get(
"/reputation/{wallet}",
response_model=ReputationSummary,
summary="Get reputation for a wallet address",
responses={
404: {"description": "No verified contributor found for this wallet"},
503: {"description": "Upstream RPC or DB unavailable"},
},
)
async def get_reputation_by_wallet(
wallet: str,
skip: Annotated[int, Query(ge=0)] = 0,
limit: Annotated[int, Query(ge=1, le=100)] = 10,
) -> ReputationSummary:
"""Return the reputation profile of the contributor who owns *wallet*.

Results are cached for 30 seconds in Redis. The ``skip``/``limit``
parameters paginate the embedded history entries.
"""
cached: Any = await cache_get("reputation", wallet)
if cached is not None:
summary = ReputationSummary.model_validate(cached)
summary.history = summary.history[skip : skip + limit]
return summary

summary = await _get_reputation_by_wallet(wallet)
if summary is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No verified contributor found for wallet {wallet}",
)

await cache_set("reputation", wallet, summary.model_dump(mode="json"))

summary.history = summary.history[skip : skip + limit]
return summary


@router.get(
"/staking/{wallet}",
response_model=StakingInfo,
summary="Get on-chain staking balances for a wallet",
responses={
502: {"description": "Solana RPC request failed"},
},
)
async def get_staking_info(wallet: str) -> StakingInfo:
"""Return native SOL and $FNDRY balances held by *wallet*.

Results are cached for 30 seconds in Redis.
"""
cached: Any = await cache_get("staking", wallet)
if cached is not None:
return StakingInfo(**cached, cached=True)

try:
sol = await get_sol_balance(wallet)
fndry = await get_token_balance(wallet)
except SolanaRPCError as exc:
logger.error("Solana RPC error for staking/%s: %s", wallet, exc)
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Solana RPC error: {exc}",
) from exc

payload = {"wallet": wallet, "sol_balance": sol, "fndry_balance": fndry}
await cache_set("staking", wallet, payload)
return StakingInfo(**payload)


@router.get(
"/treasury/stats",
response_model=TreasuryStats,
summary="Get live treasury statistics",
responses={
503: {"description": "Treasury data unavailable"},
},
)
async def get_treasury_stats_endpoint() -> TreasuryStats:
"""Return treasury SOL/FNDRY balances and aggregate payout totals.

Results are cached for 30 seconds in Redis (the treasury service also
maintains its own 60-second in-memory cache as a secondary layer).
"""
cached: Any = await cache_get("treasury", "stats")
if cached is not None:
return TreasuryStats.model_validate(cached)

try:
stats = await get_treasury_stats()
except Exception as exc:
logger.error("Failed to fetch treasury stats: %s", exc)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Treasury data temporarily unavailable",
) from exc

await cache_set("treasury", "stats", stats.model_dump(mode="json"))
return stats


@router.post(
"/webhooks/helius",
response_model=CacheInvalidationResponse,
summary="Helius / Shyft webhook for cache invalidation",
status_code=status.HTTP_200_OK,
)
async def helius_webhook(
payload: HeliusWebhookPayload,
x_helius_signature: Annotated[str | None, Header()] = None,
) -> CacheInvalidationResponse:
"""Invalidate on-chain cache entries when Helius reports new transactions.

If ``HELIUS_WEBHOOK_SECRET`` is set, the ``X-Helius-Signature`` header
is verified with HMAC-SHA256. Requests with invalid signatures are
rejected with 401.

The affected cache namespaces are derived from the ``accounts`` list in
the payload: staking entries for each account are purged, and the
treasury stats key is always cleared.
"""
if _HELIUS_WEBHOOK_SECRET:
if not x_helius_signature:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing signature"
)
expected = hmac.new(
_HELIUS_WEBHOOK_SECRET.encode(),
msg=payload.model_dump_json().encode(),
digestmod=hashlib.sha256,
).hexdigest()
if not hmac.compare_digest(expected, x_helius_signature):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid signature"
)

removed = 0
for account in payload.accounts:
await cache_invalidate("staking", account)
await cache_invalidate("reputation", account)
removed += 1

# Always bust the treasury cache on any relevant transaction
removed += await cache_invalidate_prefix("treasury")

logger.info(
"Helius webhook processed: type=%s accounts=%d removed=%d",
payload.type,
len(payload.accounts),
removed,
)
return CacheInvalidationResponse(keys_removed=removed)
4 changes: 4 additions & 0 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from app.database import init_db, close_db
from app.api.og import router as og_router
from app.api.contributor_webhooks import router as contributor_webhooks_router
from app.api.onchain import router as onchain_router
from app.middleware.security import SecurityHeadersMiddleware
from app.middleware.sanitization import InputSanitizationMiddleware
from app.services.config_validator import install_log_filter, validate_secrets
Expand Down Expand Up @@ -399,6 +400,9 @@ async def value_error_handler(request: Request, exc: ValueError):
# Admin Dashboard: /api/admin/* (protected by ADMIN_API_KEY)
app.include_router(admin_router)

# On-chain data: /api/reputation/*, /api/staking/*, /api/treasury/*, /api/webhooks/helius
app.include_router(onchain_router, prefix="/api")


@app.post("/api/sync", tags=["admin"])
async def trigger_sync():
Expand Down
Loading
Loading