-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi_utils.py
More file actions
193 lines (177 loc) · 7.56 KB
/
api_utils.py
File metadata and controls
193 lines (177 loc) · 7.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import json
import requests
from api_config import API_BASE_URL
# Define function to call API endpoint to add user
def api_add_user(user_first_name, user_last_name, user_email, user_password):
# Initialise user_id to return
user_id = None
# Create user data dictionary
user_data = {
"user_first_name": user_first_name,
"user_last_name": user_last_name,
"user_email": user_email,
"user_password": user_password
}
# Try to call API endpoint to post user data
try:
# Send POST request to /user endpoint, passing user_data in JSON format
api_response = requests.post(
f"{API_BASE_URL}/user",
data=json.dumps(user_data),
headers={"content-type": "application/json"}
)
# Test if call to endpoint successful
if api_response.status_code == 200:
# Extract user_id from JSON response
user_id = api_response.json().get("user_id", None)
# Raise exception in event of requests error
except requests.exceptions.RequestException as e:
print(f"An error occurred with the Movie Recommender API: {e}")
# Raise exception in event of any uncaught error
except Exception as e:
print(f"An unexpected error occurred: {e}")
# Return user_id or None if call to endpoint unsuccessful
return user_id
# Define function to call API endpoint to get user
def api_get_user(user_email, user_password):
# Initialise user_id to return
user_id = None
# Create user data dictionary
user_data = {
"user_email": user_email,
"user_password": user_password
}
# Try to call API endpoint to get user data
try:
# Send GET request to /user endpoint, passing user_data in JSON format
api_response = requests.get(
f"{API_BASE_URL}/user",
data=json.dumps(user_data),
headers={"content-type": "application/json"}
)
# Test if call to endpoint successful
if api_response.status_code == 200:
# Extract user_id from JSON response
user_id = api_response.json().get("user_id", None)
# Raise exception in event of requests error
except requests.exceptions.RequestException as e:
print(f"An error occurred with the Movie Recommender API: {e}")
# Raise exception in event of any uncaught error
except Exception as e:
print(f"An unexpected error occurred: {e}")
# Return user_id or None if call to endpoint unsuccessful
return user_id
# Define function to call API endpoint to add user's top 5 movies
def api_add_user_movie_top_5(user_id, movie_names):
# Initialise update_count to return
update_count = None
# Create movie data dictionary
movie_data = {
"movie_names": movie_names
}
# Try to call API endpoint to post movie data
try:
# Send POST request to /user/{user_id}/movie/top_5 endpoint, passing movie_data in JSON format
api_response = requests.post(
f"{API_BASE_URL}/user/{user_id}/movie/top_5",
data=json.dumps(movie_data),
headers={"content-type": "application/json"}
)
# Test if call to endpoint successful
if api_response.status_code == 200:
# Extract update_count from JSON response
update_count = api_response.json().get("update_count", None)
# Raise exception in event of requests error
except requests.exceptions.RequestException as e:
print(f"An error occurred with the Movie Recommender API: {e}")
# Raise exception in event of any uncaught error
except Exception as e:
print(f"An unexpected error occurred: {e}")
# Return update_count or None if call to endpoint unsuccessful
return update_count
# Define function to call API endpoint to get quizzes
def api_get_quizzes():
# Initialise quizzes to return
quizzes = None
# Try to call API endpoint to get quizzes data
try:
# Send GET request to /quizzes endpoint
api_response = requests.get(
f"{API_BASE_URL}/quizzes"
)
# Test if call to endpoint successful
if api_response.status_code == 200:
# Extract quizzes from JSON response
quizzes = api_response.json().get("quizzes", None)
# Raise exception in event of requests error
except requests.exceptions.RequestException as e:
print(f"An error occurred with the Movie Recommender API: {e}")
# Raise exception in event of any uncaught error
except Exception as e:
print(f"An unexpected error occurred: {e}")
# Return quizzes or None if call to endpoint unsuccessful
return quizzes
# Define helper function to map quiz responses into list of dictionary items
def map_quiz_responses(quiz_responses):
# Initialise list of quiz response dictionary items to return
mapped_quiz_responses = []
# Iterate over quiz responses
for quiz_response in quiz_responses:
# Create quiz response dictionary item
quiz_response_item = {
"quiz_prompt_option_id": quiz_response
}
# Append quiz response dictionary item to list
mapped_quiz_responses.append(quiz_response_item)
# Return list of quiz response dictionary items
return mapped_quiz_responses
# Define function to call API endpoint to add user's quiz responses
def api_add_user_quiz_responses(user_id, quiz_id, quiz_responses):
# Initialise update_count to return
update_count = None
# Create quiz response data dictionary
quiz_response_data = {
"quiz_responses": map_quiz_responses(quiz_responses)
}
# Try to call API endpoint to post quiz response data
try:
# Send POST request to /user/{user_id}/quiz/{quiz_id} endpoint, passing quiz_response_data in JSON format
api_response = requests.post(
f"{API_BASE_URL}/user/{user_id}/quiz/{quiz_id}",
data=json.dumps(quiz_response_data),
headers={"content-type": "application/json"}
)
# Test if call to endpoint successful
if api_response.status_code == 200:
# Extract update_count from JSON response
update_count = api_response.json().get("update_count", None)
# Raise exception in event of requests error
except requests.exceptions.RequestException as e:
print(f"An error occurred with the Movie Recommender API: {e}")
# Raise exception in event of any uncaught error
except Exception as e:
print(f"An unexpected error occurred: {e}")
# Return update_count or None if call to endpoint unsuccessful
return update_count
# Define function to call API endpoint to get user's movie recommendations
def api_get_user_movie_recommendations(user_id):
# Initialise movies to return
movies = None
# Try to call API endpoint to get movie recommendations
try:
# Send GET request to /user/{user_id}/movie/recommendations endpoint
api_response = requests.get(
f"{API_BASE_URL}/user/{user_id}/movie/recommendations"
)
# Test if call to endpoint successful
if api_response.status_code == 200:
# Extract movies from JSON response
movies = api_response.json().get("movies", None)
# Raise exception in event of requests error
except requests.exceptions.RequestException as e:
print(f"An error occurred with the Movie Recommender API: {e}")
# Raise exception in event of any uncaught error
except Exception as e:
print(f"An unexpected error occurred: {e}")
# Return movies or None if call to endpoint unsuccessful
return movies