-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain_fastapi.py
More file actions
153 lines (115 loc) · 4.21 KB
/
main_fastapi.py
File metadata and controls
153 lines (115 loc) · 4.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
"""
Author: Hugo
Date: 2020-01-17 23:14
Desc:
"""
import jwt
import json
import uvicorn
from jwt import PyJWTError
from passlib.context import CryptContext
from datetime import datetime, timedelta
from starlette.status import HTTP_401_UNAUTHORIZED
from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from typing import Dict
from fastapi import Header
from model import (
User, UserInDB, TokenData, Token
)
from utils import config_info
SECRET_KEY = config_info("SALT_KEY")
ALGORITHM = config_info("ALGORITHM")
ACCESS_TOKEN_EXPIRE_MINUTES = int(config_info("ACCESS_TOKEN_EXPIRE_MINUTES"))
users_db = json.loads(config_info("users_db"))
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
app = FastAPI()
def password_verify(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
def password_hash(password):
return pwd_context.hash(password)
def user_get(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
def user_auth(user_db, username: str, password: str):
user = user_get(user_db, username)
if not user:
return False
if not password_verify(password, user.hashed_password):
return False
return user
def token_create(*, data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def token_explain(data: str) -> Dict:
try:
r = jwt.decode(data, SECRET_KEY, algorithm=ALGORITHM)
return {"retcode": 0, "stdout": r}
except Exception as e:
return {"retcode": 1, "stderr": f"{e}"}
async def user_current(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except PyJWTError:
raise credentials_exception
user = user_get(users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
async def user_current_active(current_user: User = Depends(user_current)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token/", response_model=Token)
async def token_apply(form_data: OAuth2PasswordRequestForm = Depends()):
user = user_auth(users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = token_create(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/token/check/")
async def token_check(user: User, access_token: str = Header(None)):
r = token_explain(access_token)
return r
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(user_current_active)):
return current_user
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(user_current_active)):
return [{"item_id": "Foo", "owner": current_user.username}]
@app.on_event("startup")
async def startup_event():
print("start up")
@app.on_event("shutdown")
async def shutdown_event():
print("shut down")
if __name__ == "__main__":
uvicorn.run(app=app,
host="0.0.0.0",
port=14000,
workers=1
)