-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoauth2.py
147 lines (127 loc) · 4.96 KB
/
oauth2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import os
import requests
import json
import secrets
from datetime import datetime, timedelta
from flask import Blueprint, redirect, request, render_template, session
from config import CLIENT_ID, CLIENT_SECRET, REDIRECT_URI, AUTHORIZATION_URL, TOKEN_URL
from api_integration import get_contact_lists
oauth2_blueprint = Blueprint('oauth2', __name__)
TOKEN_FILE = 'tokens.json'
def ensure_token_file_exists():
if not os.path.exists(TOKEN_FILE):
with open(TOKEN_FILE, "w") as file:
json.dump({
"access_token": "",
"refresh_token": "",
"expires_in": 0
}, file)
# Call this function at the beginning to ensure tokens.json exists
ensure_token_file_exists()
def save_tokens_to_file(access_token, refresh_token, expires_in):
expiration_time = datetime.now() + timedelta(seconds=expires_in)
token_data = {
'access_token': access_token,
'refresh_token': refresh_token,
'expires_at': expiration_time.strftime('%Y-%m-%d %H:%M:%S')
}
with open(TOKEN_FILE, 'w') as f:
json.dump(token_data, f)
def load_tokens_from_file():
try:
with open(TOKEN_FILE, 'r') as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return None
def is_token_expired(token_data):
expires_at = datetime.strptime(token_data['expires_at'], '%Y-%m-%d %H:%M:%S')
return datetime.now() > expires_at - timedelta(minutes=5) # Buffer of 5 minutes
def refresh_access_token():
token_data = load_tokens_from_file()
refresh_token = token_data.get('refresh_token')
if not refresh_token:
return None
data = {
'grant_type': 'refresh_token',
'refresh_token': refresh_token,
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'scope': 'contact_data'
}
response = requests.post(TOKEN_URL, data=data)
if response.status_code == 200:
new_tokens = response.json()
save_tokens_to_file(new_tokens['access_token'], new_tokens['refresh_token'], new_tokens['expires_in'])
return new_tokens['access_token']
else:
handle_error(response)
return None
def get_access_token():
token_data = load_tokens_from_file()
if not token_data:
return None
if is_token_expired(token_data):
return refresh_access_token()
else:
return token_data.get('access_token')
@oauth2_blueprint.route('/authorize')
def authorize():
# Generate a unique state value
state = secrets.token_urlsafe(16)
session['state'] = state # Save state value in the user's session
# Construct the full authorization URL
full_auth_url = (f"{AUTHORIZATION_URL}?response_type=code"
f"&client_id={CLIENT_ID}"
f"&redirect_uri={REDIRECT_URI}"
f"&scope=contact_data offline_access"
f"&state={state}")
return redirect(full_auth_url)
@oauth2_blueprint.route('/callback')
def callback():
code = request.args.get('code')
returned_state = request.args.get('state')
# Validate the state value
if 'state' not in session or session['state'] != returned_state:
return render_template('error.html', error_message="State validation failed.")
token_data = {
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': REDIRECT_URI,
'client_id': CLIENT_ID,
'client_secret': CLIENT_SECRET,
'scope': 'contact_data'
}
response = requests.post(TOKEN_URL, data=token_data)
if response.status_code == 200:
data = response.json()
save_tokens_to_file(data['access_token'], data['refresh_token'], data['expires_in'])
return render_template('success.html')
else:
handle_error(response)
return render_template('error.html', error_message=f"Error during authorization: {response.text}")
@oauth2_blueprint.route('/is_authenticated')
def is_authenticated():
token = get_access_token()
if token and get_contact_lists(token):
return {"authenticated": True}
else:
return {"authenticated": False}
def handle_error(response):
if response.status_code == 400:
# Handle "Bad Request" error
print("Error 400: Bad Request - ", response.text)
elif response.status_code == 401:
# Handle "Unauthorized" error
print("Error 401: Unauthorized - ", response.text)
# Ideally, you'd trigger a token refresh here if it's an auth error.
elif response.status_code == 403:
# Handle "Forbidden" error
print("Error 403: Forbidden - ", response.text)
elif response.status_code == 404:
# Handle "Not Found" error
print("Error 404: Not Found - ", response.text)
elif response.status_code == 500:
# Handle "Internal Server Error"
print("Error 500: Internal Server Error - ", response.text)
else:
print(f"Error {response.status_code}: ", response.text)