-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgoogle_api.py
243 lines (195 loc) · 9.02 KB
/
google_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
import asyncio
import os.path
import cv2
from tenacity import retry, wait_random_exponential, stop_after_attempt
import numpy as np
import requests
from google.auth.exceptions import RefreshError
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from config import config
SCOPES = ['https://www.googleapis.com/auth/photoslibrary.readonly']
def get_token():
if not os.path.exists("client_secret.json"):
raise FileNotFoundError("Google Project credentials not found. Create them and save them as client_secret.json")
creds = None
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists("token.json"):
creds = Credentials.from_authorized_user_file("token.json", SCOPES)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
try:
creds.refresh(Request())
except RefreshError as e:
os.remove("token.json")
flow = InstalledAppFlow.from_client_secrets_file(
"client_secret.json", SCOPES
)
creds = flow.run_local_server(port=0)
else:
flow = InstalledAppFlow.from_client_secrets_file(
"client_secret.json", SCOPES
)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open("token.json", "w") as token:
token.write(creds.to_json())
return creds.token
def google_api_media_search(page_token, date_ranges, album_id):
payload = {
"pageSize": 100,
"pageToken": page_token
}
if album_id:
payload["albumId"] = album_id
else:
payload["filters"] = {
"dateFilter": {
"ranges": date_ranges
}
}
response = requests.post(
"https://photoslibrary.googleapis.com/v1/mediaItems:search",
headers={
"Authorization": f"Bearer {get_token()}"
},
json=payload,
timeout=30
)
response_json = response.json()
media_items = response_json.get("mediaItems", None)
if not media_items:
return [], None
next_page_token = response_json.get("nextPageToken")
photo_ids = [photo["id"] for photo in media_items if photo["mimeType"].startswith("image/")]
return photo_ids, next_page_token
def google_api_album_search(page_token, shared_albums, album_names):
if shared_albums:
url = "https://photoslibrary.googleapis.com/v1/sharedAlbums"
else:
url = "https://photoslibrary.googleapis.com/v1/albums"
response = requests.get(
url,
params={
"pageSize": 50,
"pageToken": page_token
},
headers={
"Authorization": f"Bearer {get_token()}"
},
timeout=30
)
response_json = response.json()
albums = response_json["albums"] if not shared_albums else response_json["sharedAlbums"]
next_page_token = response_json.get("nextPageToken")
if len(album_names) == 1 and album_names[0] == "ALL":
album_ids = [album["id"] for album in albums]
else:
album_ids = [album["id"] for album in albums if album.get("title", None) in album_names]
return album_ids, next_page_token
def get_all_media_items():
all_album_ids = set()
# Get all the album ids
albums = config['photo_selection'].get('albums', None)
if albums and len(albums) != 0:
album_ids, next_page_token = google_api_album_search(page_token="", shared_albums=False, album_names=albums)
all_album_ids.update(album_ids)
page = 1
while next_page_token:
print(f"Getting Google Photos albums page {page}...")
album_ids, next_page_token = google_api_album_search(page_token=next_page_token, shared_albums=False, album_names=albums)
all_album_ids.update(album_ids)
page += 1
# Get all the shared album ids
shared_albums = config['photo_selection'].get('shared_albums', None)
if shared_albums and len(shared_albums) != 0:
album_ids, next_page_token = google_api_album_search(page_token="", shared_albums=True, album_names=shared_albums)
all_album_ids.update(album_ids)
page = 1
while next_page_token:
print(f"Getting Google Photos shared albums page {page}...")
album_ids, next_page_token = google_api_album_search(page_token=next_page_token, shared_albums=True, album_names=shared_albums)
all_album_ids.update(album_ids)
page += 1
all_photo_ids = set()
# Get all the media items from the albums
if len(all_album_ids) != 0:
for album_id in all_album_ids:
photo_ids, next_page_token = google_api_media_search(page_token="", date_ranges=[], album_id=album_id)
all_photo_ids.update(photo_ids)
page = 1
while next_page_token:
print(f"Getting Google Photos from album page {page}...")
photo_ids, next_page_token = google_api_media_search(page_token=next_page_token, date_ranges=[], album_id=album_id)
all_photo_ids.update(photo_ids)
page += 1
# Get all the media items from the date ranges
date_ranges = config['photo_selection']['ranges']
if date_ranges and date_ranges != []:
photo_ids, next_page_token = google_api_media_search(page_token="", date_ranges=date_ranges, album_id=None)
all_photo_ids.update(photo_ids)
page = 1
while next_page_token:
print(f"Getting Google Photos from date ranges page {page}...")
photo_ids, next_page_token = google_api_media_search(page_token=next_page_token, date_ranges=date_ranges, album_id=None)
all_photo_ids.update(photo_ids)
page += 1
with open("all_photo_ids.txt", "w") as f:
for photo_id in all_photo_ids:
f.write(f"{photo_id}\n")
print("All photo ids have been retrieved and stored.")
@retry(wait=wait_random_exponential(min=3, max=20), stop=stop_after_attempt(3))
def download_photo(photo_name, photo_id, server_state):
response = requests.get(f"https://photoslibrary.googleapis.com/v1/mediaItems/{photo_id}",
headers={
"Authorization": f"Bearer {get_token()}"
}, timeout=30)
if response.status_code != 200:
print(f"Error {response.status_code} - {response.reason} getting photo {photo_id}, trying again...")
raise ConnectionError(f"Error while getting photo {photo_id}")
data = response.json()
base_url = data["baseUrl"]
product_url = data["productUrl"]
# Store the product URL in the server state
server_state['photo_urls'][f"{photo_name}.jpg"] = product_url
response = requests.get(f"{base_url}=d", timeout=30)
if response.status_code != 200:
print(f"Error {response.status_code} - {response.reason} downloading photo {photo_id}, trying again...")
raise ConnectionError(f"Error while getting photo {photo_id}")
# Store the image
with open(f"photos/{photo_name}.jpg", "wb") as f:
f.write(response.content)
async def download_random_photos(number_of_photos, photo_names, refresh_photos=False, server_state=None):
if len(photo_names) != number_of_photos:
raise ValueError("The number of photo names should be equal to the number of photos")
if not os.path.exists("all_photo_ids.txt") or refresh_photos:
get_all_media_items()
with open("all_photo_ids.txt", "r") as f:
all_photo_ids = f.read().splitlines()
if len(all_photo_ids) < number_of_photos:
raise ValueError("There are not enough photos to select from.")
# Select random photos links
random_photos_ids = np.random.choice(all_photo_ids, number_of_photos, replace=False)
# Create directory for photos if it doesn't exist
if not os.path.exists('photos'):
os.makedirs('photos')
# Download the photos
for i, photo_id in enumerate(random_photos_ids):
tries = 0
download_photo(photo_names[i], photo_id, server_state)
# Test that the image was downloaded correctly. Try 3 times.
img = cv2.imread(f"photos/{photo_names[i]}.jpg")
while img is None and tries < 3:
new_photo_id = np.random.choice(all_photo_ids, 1, replace=False)
print(f"Invalid photo {photo_id}. Trying with photo {new_photo_id}...")
download_photo(photo_names[i], new_photo_id, server_state)
img = cv2.imread(f"photos/{photo_names[i]}.jpg")
tries += 1
if img is None:
raise ConnectionError(f"Error downloading photos. Couldn't find a suitable photo.")
if __name__ == "__main__":
asyncio.run(download_random_photos(3, ["0", "1", "2"]))