-
Notifications
You must be signed in to change notification settings - Fork 140
/
Copy pathoauth.py
85 lines (75 loc) · 2.63 KB
/
oauth.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from __future__ import annotations
import base64
from fastapi import Request
from fastapi import status
from fastapi.exceptions import HTTPException
from fastapi.openapi.models import OAuthFlowAuthorizationCode
from fastapi.openapi.models import OAuthFlowClientCredentials
from fastapi.openapi.models import OAuthFlows
from fastapi.security import OAuth2
from fastapi.security.utils import get_authorization_scheme_param
class OAuth2Scheme(OAuth2):
def __init__(
self,
authorizationUrl: str,
tokenUrl: str,
refreshUrl: str | None = None,
scheme_name: str | None = None,
scopes: dict[str, str] | None = None,
description: str | None = None,
auto_error: bool = True,
):
if not scopes:
scopes = {}
flows = OAuthFlows(
authorizationCode=OAuthFlowAuthorizationCode(
authorizationUrl=authorizationUrl,
tokenUrl=tokenUrl,
scopes=scopes,
refreshUrl=refreshUrl,
),
clientCredentials=OAuthFlowClientCredentials(
tokenUrl=tokenUrl,
scopes=scopes,
refreshUrl=refreshUrl,
),
)
super().__init__(
flows=flows,
scheme_name=scheme_name,
description=description,
auto_error=auto_error,
)
async def __call__(self, request: Request) -> str | None:
authorization = request.headers.get("Authorization")
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "bearer":
if self.auto_error:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not authenticated",
headers={"WWW-Authenticate": "Bearer"},
)
else:
return None
return param
# https://developer.zendesk.com/api-reference/sales-crm/authentication/requests/#client-authentication
def get_credentials_from_basic_auth(
request: Request,
) -> dict[str, str | int] | None:
authorization = request.headers.get("Authorization")
scheme, param = get_authorization_scheme_param(authorization)
if not authorization or scheme.lower() != "basic":
return None
data = base64.b64decode(param).decode("utf-8")
if ":" not in data:
return None
split = data.split(":")
if len(split) != 2:
return None
if not split[0].isdecimal():
return None
return {
"client_id": int(split[0]),
"client_secret": split[1],
}