|
| 1 | +import datetime |
| 2 | +from typing import Optional |
| 3 | + |
| 4 | +from fastapi import Depends, HTTPException, status |
| 5 | +from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm |
| 6 | +from jose import jwt, JWTError |
| 7 | +from pydantic import BaseModel |
| 8 | +from sqlalchemy.orm import Session |
| 9 | + |
| 10 | +from app.config import get_config |
| 11 | +from app.models.user import User |
| 12 | +from app.database import get_db |
| 13 | +from werkzeug.security import generate_password_hash, check_password_hash |
| 14 | + |
| 15 | +from . import public_api |
| 16 | + |
| 17 | +config = get_config() |
| 18 | +SECRET_KEY = config.SECRET_KEY |
| 19 | +ALGORITHM = "HS256" |
| 20 | +ACCESS_TOKEN_EXPIRE_MINUTES = 60 |
| 21 | + |
| 22 | +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="api/login") |
| 23 | + |
| 24 | +class Token(BaseModel): |
| 25 | + access_token: str |
| 26 | + token_type: str |
| 27 | + |
| 28 | +class UserLogin(BaseModel): |
| 29 | + email: str |
| 30 | + password: str |
| 31 | + |
| 32 | +class UserSignup(BaseModel): |
| 33 | + email: str |
| 34 | + password: str |
| 35 | + password2: str |
| 36 | + newsletter: bool = False |
| 37 | + |
| 38 | +def create_access_token(data: dict, expires_delta: Optional[datetime.timedelta] = None): |
| 39 | + to_encode = data.copy() |
| 40 | + if expires_delta: |
| 41 | + expire = datetime.datetime.utcnow() + expires_delta |
| 42 | + else: |
| 43 | + expire = datetime.datetime.utcnow() + datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) |
| 44 | + to_encode.update({"exp": expire}) |
| 45 | + # Encode the token using python-jose |
| 46 | + encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) |
| 47 | + return encoded_jwt |
| 48 | + |
| 49 | +async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): |
| 50 | + credentials_exception = HTTPException( |
| 51 | + status_code=status.HTTP_401_UNAUTHORIZED, |
| 52 | + detail="Could not validate credentials", |
| 53 | + headers={"WWW-Authenticate": "Bearer"}, |
| 54 | + ) |
| 55 | + try: |
| 56 | + # Decode token using python-jose |
| 57 | + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) |
| 58 | + email: str = payload.get("sub") |
| 59 | + if email is None: |
| 60 | + raise credentials_exception |
| 61 | + except JWTError: |
| 62 | + raise credentials_exception |
| 63 | + |
| 64 | + user = db.query(User).filter(User.email == email).first() |
| 65 | + if user is None: |
| 66 | + raise credentials_exception |
| 67 | + return user |
| 68 | + |
| 69 | +@public_api.post("/login", response_model=Token) |
| 70 | +def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): |
| 71 | + email = form_data.username |
| 72 | + password = form_data.password |
| 73 | + user = db.query(User).filter_by(email=email).first() |
| 74 | + if user is None or not check_password_hash(user.password, password): |
| 75 | + raise HTTPException(status_code=401, detail="Invalid email or password") |
| 76 | + |
| 77 | + access_token_expires = datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) |
| 78 | + access_token = create_access_token(data={"sub": user.email}, expires_delta=access_token_expires) |
| 79 | + return {"access_token": access_token, "token_type": "bearer"} |
| 80 | + |
| 81 | +@public_api.post("/signup") |
| 82 | +def signup(data: UserSignup, db: Session = Depends(get_db)): |
| 83 | + # Check if there is already a user registered (one-user system) |
| 84 | + if db.query(User).first() is not None: |
| 85 | + raise HTTPException(status_code=400, detail="Only one user is allowed. Please login!") |
| 86 | + |
| 87 | + if not data.email: |
| 88 | + raise HTTPException(status_code=400, detail="Email is required.") |
| 89 | + if not data.password: |
| 90 | + raise HTTPException(status_code=400, detail="Password is required.") |
| 91 | + if data.password != data.password2: |
| 92 | + raise HTTPException(status_code=400, detail="Passwords do not match.") |
| 93 | + |
| 94 | + if db.query(User).filter_by(email=data.email).first(): |
| 95 | + raise HTTPException(status_code=400, detail="Email already in use.") |
| 96 | + |
| 97 | + # Handle newsletter signup if needed |
| 98 | + |
| 99 | + user = User(email=data.email) |
| 100 | + user.password = generate_password_hash(data.password) |
| 101 | + db.add(user) |
| 102 | + db.commit() |
| 103 | + |
| 104 | + # Auto-login after signup: |
| 105 | + access_token_expires = datetime.timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) |
| 106 | + access_token = create_access_token(data={"sub": user.email}, expires_delta=access_token_expires) |
| 107 | + return {"success": True, "access_token": access_token, "token_type": "bearer"} |
0 commit comments