-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #92 from HackAtUCI/feature/guest-auth
Implement guest authentication with passphrases
- Loading branch information
Showing
7 changed files
with
437 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
import hashlib | ||
import hmac | ||
import os | ||
import secrets | ||
from datetime import datetime, timedelta | ||
from typing import Optional | ||
|
||
from pydantic import BaseModel, EmailStr | ||
|
||
from auth import user_identity | ||
from auth.user_identity import GuestUser, utc_now | ||
from services import mongodb_handler | ||
from services.mongodb_handler import BaseRecord, Collection | ||
|
||
# from utils import email_handler | ||
|
||
AUTH_KEY_SALT = os.getenv("AUTH_KEY_SALT", "")[:16].encode() | ||
PASSPHRASE_LENGTH = 4 | ||
WORD_LIST: list[str] = [] | ||
T_GUEST_TOKEN = timedelta(minutes=10) | ||
|
||
|
||
class GuestAuth(BaseModel): | ||
iat: datetime | ||
exp: datetime | ||
key: str | ||
|
||
|
||
class GuestRecord(BaseRecord): | ||
guest_auth: GuestAuth | ||
|
||
|
||
async def initiate_guest_login(email: EmailStr) -> Optional[str]: | ||
"""Generate a login passphrase to be emailed, save the authentication key, | ||
and a confirmation token to be saved as a cookie.""" | ||
if await _get_existing_key(email): | ||
# To prevent spammed requests, user must wait until previous key expires | ||
return None | ||
|
||
confirmation = _generate_confirmation_token() | ||
passphrase = await _generate_passphrase(PASSPHRASE_LENGTH) | ||
auth_key = _generate_key(confirmation, passphrase) | ||
|
||
uid = user_identity.scoped_uid(email) | ||
now = utc_now() | ||
exp = now + T_GUEST_TOKEN | ||
|
||
guest = GuestRecord( | ||
uid=uid, | ||
guest_auth=GuestAuth(iat=now, exp=exp, key=auth_key), | ||
) | ||
|
||
await _save_guest_key(guest) | ||
print(email, passphrase) | ||
# await email_handler.send_guest_login_email(email, passphrase) | ||
|
||
return confirmation | ||
|
||
|
||
async def verify_guest_credentials( | ||
email: EmailStr, passphrase: str, confirmation: str | ||
) -> bool: | ||
"""Check that passphrase and confirmation are valid for the given user.""" | ||
key = await _get_existing_key(email) | ||
if not key: | ||
return False | ||
|
||
# TODO: delete used key | ||
return _validate(key, passphrase, confirmation) | ||
|
||
|
||
def acquire_guest_identity(email: EmailStr) -> GuestUser: | ||
"""Provide a user identity for the given guest.""" | ||
return GuestUser(email=email) | ||
|
||
|
||
async def _get_existing_key(email: EmailStr) -> Optional[str]: | ||
"""Retrieve guest authentication key, `None` if expired.""" | ||
uid = user_identity.scoped_uid(email) | ||
record = await mongodb_handler.retrieve_one( | ||
Collection.USERS, {"_id": uid}, ["guest_auth"] | ||
) | ||
|
||
if not record or not record["guest_auth"]: | ||
return None | ||
|
||
auth = GuestAuth.model_validate(record["guest_auth"]) | ||
|
||
# Reject expired key | ||
now = utc_now() | ||
if now > auth.exp: | ||
await _remove_guest_key(uid) | ||
return None | ||
|
||
return auth.key | ||
|
||
|
||
async def _save_guest_key(guest: GuestRecord) -> None: | ||
"""Save guest authentication key to user record.""" | ||
await mongodb_handler.update_one( | ||
Collection.USERS, {"_id": guest.uid}, guest.dict(), upsert=True | ||
) | ||
|
||
|
||
async def _remove_guest_key(uid: str) -> None: | ||
await mongodb_handler.update_one( | ||
Collection.USERS, | ||
{"_id": uid}, | ||
{"guest_auth": None, "last_login": utc_now()}, | ||
) | ||
|
||
|
||
def _generate_confirmation_token() -> str: | ||
"""Generate a confirmation token to use for guest authentication.""" | ||
return secrets.token_urlsafe() | ||
|
||
|
||
async def _generate_passphrase(length: int) -> str: | ||
"""Generate a secret passphrase to use for guest authentication.""" | ||
words = await _get_word_list() | ||
return "-".join(secrets.choice(words) for _ in range(length)) | ||
|
||
|
||
def _generate_key(confirmation: str, passphrase: str) -> str: | ||
"""Generate a key from a passphrase and confirmation token.""" | ||
content = confirmation + passphrase | ||
return hashlib.blake2b(content.encode(), salt=AUTH_KEY_SALT).hexdigest() | ||
|
||
|
||
def _validate(key: str, passphrase: str, confirmation: str) -> bool: | ||
"""Validate a passphrase, confirmation token, and authentication key.""" | ||
digest = _generate_key(confirmation, passphrase) | ||
return hmac.compare_digest(key, digest) | ||
|
||
|
||
async def _get_word_list() -> list[str]: | ||
"""Fetch list of words to use for passphrase generation from MongoDB.""" | ||
global WORD_LIST | ||
if not WORD_LIST: | ||
record: Optional[dict[str, list[str]]] = await mongodb_handler.retrieve_one( | ||
Collection.SETTINGS, {"_id": "word_list"} | ||
) | ||
if not record: | ||
raise RuntimeError("Guest authentication word list is not available") | ||
WORD_LIST = record["words"] | ||
return WORD_LIST |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
from logging import getLogger | ||
from typing import Annotated | ||
from urllib.parse import urlencode | ||
|
||
from fastapi import APIRouter, Cookie, Depends, Form, HTTPException, status | ||
from fastapi.responses import RedirectResponse | ||
from pydantic import EmailStr | ||
|
||
from auth import guest_auth, user_identity | ||
|
||
log = getLogger(__name__) | ||
|
||
router = APIRouter() | ||
|
||
|
||
def guest_email(email: Annotated[EmailStr, Form()]) -> EmailStr: | ||
"""Require a university guest (non-UCI) email as a form field.""" | ||
if user_identity.uci_email(email): | ||
log.info("%s attempted to log in as guest.", email) | ||
raise HTTPException( | ||
status.HTTP_403_FORBIDDEN, "UCI affiliates must log in with SSO." | ||
) | ||
if email.endswith("@irvinehacks.com"): | ||
# TODO: sponsor authentication | ||
raise HTTPException(status.HTTP_501_NOT_IMPLEMENTED) | ||
if not email.endswith(".edu"): | ||
log.info("%s attempted to log in as guest without a .edu address.", email) | ||
# raise HTTPException( | ||
# status.HTTP_403_FORBIDDEN, "Only .edu emails are allowed to log in." | ||
# ) | ||
return email | ||
|
||
|
||
@router.post("/login") | ||
async def guest_login( | ||
email: Annotated[EmailStr, Depends(guest_email)] | ||
) -> RedirectResponse: | ||
"""Generate login passphrase and set cookie with confirmation token. | ||
The initiation will send an email with the passphrase.""" | ||
try: | ||
confirmation = await guest_auth.initiate_guest_login(email) | ||
except RuntimeError as err: | ||
log.exception("During guest login: %s", err) | ||
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR) | ||
|
||
if not confirmation: | ||
raise HTTPException(status.HTTP_429_TOO_MANY_REQUESTS) | ||
|
||
# Redirect to guest login page on client | ||
# which displays a message to check email and enter passphrase | ||
query = urlencode({"email": email}) | ||
response = RedirectResponse(f"/guest-login?{query}", status.HTTP_303_SEE_OTHER) | ||
response.set_cookie( | ||
"guest_confirmation", confirmation, max_age=600, secure=True, httponly=True | ||
) | ||
return response | ||
|
||
|
||
@router.post("/verify") | ||
async def verify_guest( | ||
email: Annotated[EmailStr, Depends(guest_email)], | ||
passphrase: Annotated[str, Form()], | ||
guest_confirmation: Annotated[str, Cookie()], | ||
) -> RedirectResponse: | ||
"""Verify guest token""" | ||
if not await guest_auth.verify_guest_credentials( | ||
email, passphrase, guest_confirmation | ||
): | ||
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Unauthorized") | ||
|
||
log.info("%s authenticated as guest.", email) | ||
guest = guest_auth.acquire_guest_identity(email) | ||
|
||
res = RedirectResponse("/portal", status_code=status.HTTP_303_SEE_OTHER) | ||
user_identity.issue_user_identity(guest, res) | ||
return res |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
from datetime import datetime | ||
from unittest.mock import AsyncMock, Mock, patch | ||
|
||
from fastapi import FastAPI | ||
from fastapi.testclient import TestClient | ||
|
||
from auth import guest_auth | ||
from auth.guest_auth import GuestAuth, GuestRecord | ||
from routers import guest | ||
|
||
app = FastAPI() | ||
app.include_router(guest.router) | ||
|
||
client = TestClient(app) | ||
|
||
SAMPLE_EMAIL = "[email protected]" | ||
SAMPLE_LOGIN_DATA = {"email": SAMPLE_EMAIL} | ||
SAMPLE_PASSPHRASE = "correct-horse-battery-staple" | ||
|
||
|
||
# def test_non_edu_email_forbidden() -> None: | ||
# """Test that a guest with a non-edu email is forbidden from logging in.""" | ||
# res = client.post("/login", data={"email": "[email protected]"}) | ||
# assert res.status_code == 403 | ||
|
||
|
||
def test_uci_email_forbidden_as_guest() -> None: | ||
"""Test that a UCI email cannot be used with guest authentication.""" | ||
res = client.post("/login", data={"email": "[email protected]"}) | ||
assert res.status_code == 403 | ||
|
||
|
||
@patch("utils.email_handler.send_guest_login_email", autospec=True) | ||
@patch("auth.guest_auth._save_guest_key", autospec=True) | ||
@patch("auth.guest_auth.utc_now", autospec=True) | ||
@patch("auth.guest_auth._generate_passphrase", autospec=True) | ||
@patch("auth.guest_auth._generate_confirmation_token", autospec=True) | ||
@patch("auth.guest_auth._get_existing_key", autospec=True) | ||
def test_guest_login_initiation( | ||
mock_get_existing_key: AsyncMock, | ||
mock_generate_confirmation_token: Mock, | ||
mock_generate_passphrase: Mock, | ||
mock_utc_now: Mock, | ||
mock_save_guest_key: AsyncMock, | ||
mock_send_guest_login_email: AsyncMock, | ||
) -> None: | ||
"""Test full guest login initiation flow.""" | ||
|
||
mock_get_existing_key.return_value = None | ||
mock_generate_confirmation_token.return_value = "abcdef" | ||
mock_generate_passphrase.return_value = SAMPLE_PASSPHRASE | ||
mock_utc_now.return_value = datetime(2023, 2, 4) | ||
|
||
res = client.post("/login", data=SAMPLE_LOGIN_DATA, follow_redirects=False) | ||
|
||
mock_save_guest_key.assert_awaited_once_with( | ||
GuestRecord( | ||
uid="edu.caltech.beaver", | ||
guest_auth=GuestAuth( | ||
iat=datetime(2023, 2, 4), | ||
exp=datetime(2023, 2, 4, 0, 10, 0), | ||
key=guest_auth._generate_key("abcdef", SAMPLE_PASSPHRASE), | ||
), | ||
) | ||
) | ||
# mock_send_guest_login_email.assert_awaited_once_with( | ||
# SAMPLE_EMAIL, SAMPLE_PASSPHRASE | ||
# ) | ||
|
||
assert res.status_code == 303 | ||
assert res.headers["location"] == "/guest-login?email=beaver%40caltech.edu" | ||
assert res.headers["Set-Cookie"].startswith("guest_confirmation=abcdef;") | ||
|
||
|
||
@patch("auth.guest_auth._get_existing_key", autospec=True) | ||
def test_requesting_login_when_previous_key_exists_causes_429( | ||
mock_get_existing_key: AsyncMock, | ||
) -> None: | ||
"""Test that requesting to log in as guest when the user has an existing, | ||
unexpired key causes status 429.""" | ||
|
||
mock_get_existing_key.return_value = "some-existing-key" | ||
res = client.post("/login", data=SAMPLE_LOGIN_DATA) | ||
assert res.status_code == 429 | ||
|
||
|
||
@patch("auth.guest_auth._get_existing_key", autospec=True) | ||
def test_successful_guest_verification_provides_identity( | ||
mock_get_existing_key: AsyncMock, | ||
) -> None: | ||
"""Test a guest successfully verifying guest credentials.""" | ||
mock_get_existing_key.return_value = guest_auth._generate_key( | ||
"some-confirmation", SAMPLE_PASSPHRASE | ||
) | ||
|
||
res = client.post( | ||
"/verify", | ||
data={"email": SAMPLE_EMAIL, "passphrase": SAMPLE_PASSPHRASE}, | ||
cookies={"guest_confirmation": "some-confirmation"}, | ||
follow_redirects=False, | ||
) | ||
|
||
assert res.status_code == 303 | ||
assert res.headers["Set-Cookie"].startswith("irvinehacks_auth=") | ||
|
||
|
||
@patch("auth.guest_auth._get_existing_key", autospec=True) | ||
def test_invalid_guest_verification_is_unauthorized( | ||
mock_get_existing_key: AsyncMock, | ||
) -> None: | ||
"""Test that a guest with invalid credentials is unauthorized.""" | ||
mock_get_existing_key.return_value = "some-existing-key" | ||
|
||
res = client.post( | ||
"/verify", | ||
data={"email": SAMPLE_EMAIL, "passphrase": "bad-passphrase"}, | ||
cookies={"guest_confirmation": "not-a-confirmation"}, | ||
) | ||
|
||
assert res.status_code == 401 |
Oops, something went wrong.