-
Notifications
You must be signed in to change notification settings - Fork 0
/
data_collection.py
171 lines (135 loc) · 6.44 KB
/
data_collection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import requests
from requests.auth import HTTPBasicAuth
import json
import pandas as pd
import os
import concurrent.futures
import logging, sys
logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
'''
https://data.cityofnewyork.us/resource/755u-8jsi.json?borough=Manhattan
https://data.cityofnewyork.us/resource/755u-8jsi.json?borough=Brooklyn
Above are the api destinations for the NYC open data for taxi regions in the boroughs of Brooklyn and Manhattan
'''
'''
https://data.cityofnewyork.us/resource/i4gi-tjb9.json?borough=Manhattan
https://data.cityofnewyork.us/resource/i4gi-tjb9.json?borough=Brooklyn
Above are the api destinations for the NYC open data for traffic speeds that is updated multiple times per day
'''
'''
https://data.cityofnewyork.us/resource/8nfn-ifaj.json
Green Taxi Trip Data 2022
'''
'''
https://data.cityofnewyork.us/resource/qp3b-zxtp.json
Yellow Taxi Trip Data 2022
'''
def batch_api_call(base_url, auth, order, destination, limit=10000):
'''
Function that will get data using the city of new york open data api, and store it in a csv file
Args:
base_url (str): The base url of the api resource
auth (object): requests HTTPBasicAuth object for access to the api
order (str): The attribute name to order the data by
destination (str): The location to save the final data in a csv file
limit (int): The number of rows to pull in each api call default 10000
Returns:
str: Number of bytes written and the file location
'''
header=True
offset = 0
url = base_url+'?$offset={offset}&$limit={limit}&$order={order}'
while 1==1:
rsp = requests.request('get',url.format(offset=offset, limit=limit, order=order),auth=auth)
if not rsp.ok:
raise requests.RequestException(f'Request failed and returned status code {rsp.status_code}')
if len(json.loads(rsp.content)) == 0:
break
df = pd.DataFrame(json.loads(rsp.content))
df.to_csv(destination, mode='a', index=False, header=header)
offset += limit
header=False
return f'{os.stat(destination).st_size} Bytes written to {destination}'
def batch_api_call_mp(base_url, auth, destination, limit=10000, concurrency=10):
'''
Function that will get data using the city of new york open data api, and store it in a csv file using multithreading
Args:
base_url (str): The base url of the api resource
auth (object): requests HTTPBasicAuth object for access to the api
destination (str): The location to save the final data in a csv file
limit (int): The number of rows to pull in each api call (default 10000)
concurrency (int): The number of requests that will be sent concurrently (default 10)
Returns:
str: Number of bytes written and the file location
'''
offset = 0
url = base_url+'?$offset={offset}&$limit={limit}'
rsp = requests.request('get',url.format(offset=offset, limit=limit),auth=auth)
if not rsp.ok:
raise requests.RequestException(f'Request failed and returned status code {rsp.status_code}')
df = pd.DataFrame(json.loads(rsp.content))
df.to_csv(destination, mode='a', index=False, header=True)
print(f'First {limit} rows pulled and written')
offset += limit
while 1==1:
with concurrent.futures.ThreadPoolExecutor() as Executor:
r = [Executor.submit(requests.request, **{'method':'get','url':url.format(offset=offset+limit*n, limit=limit),'auth':auth}) for n in range(0,concurrency)]
for resp in concurrent.futures.as_completed(r):
rsp = resp.result()
if not rsp.ok:
raise requests.RequestException(f'Request failed and returned status code {rsp.status_code} {rsp.reason}')
df = pd.DataFrame(json.loads(rsp.content))
df.to_csv(destination, mode='a', index=False, header=False)
offset += limit*concurrency
last_res = r.pop()
print(offset,last_res.result().reason,end='\r')
if len(json.loads(last_res.result().content)) == 0:
break
return f'{os.stat(destination).st_size} Bytes written to {destination}'
def mt_nyc_opendata(base_url, auth, destination, limit=10000, concurrency=10):
'''
Function that will get data using the city of new york open data api, and store it in a csv file using multithreading
Args:
base_url (str): The base url of the api resource
auth (object): requests HTTPBasicAuth object for access to the api
destination (str): The location to save the final data in a csv file
limit (int): The number of rows to pull in each api call (default 10000)
concurrency (int): The number of requests that will be sent concurrently (default 10)
Returns:
str: Number of bytes written and the file location
'''
offset = 0
url = base_url+'?$offset={offset}&$limit={limit}'
rsp = requests.request('get',url.format(offset=offset, limit=limit),auth=auth)
if not rsp.ok:
raise requests.RequestException(f'Request failed and returned status code {rsp.status_code}')
with open(destination, 'wb') as f:
for row in rsp.iter_lines():
f.write(row)
logger.info('First %d rows pulled and written', limit)
offset += limit
while 1==1:
with concurrent.futures.ThreadPoolExecutor() as Executor:
r = [Executor.submit(requests.request, **{'method':'get','url':url.format(offset=offset+limit*n, limit=limit),'auth':auth}) for n in range(0,concurrency)]
for resp in concurrent.futures.as_completed(r):
rsp = resp.result()
if not rsp.ok:
raise requests.RequestException(f'Request failed and returned status code {rsp.status_code} {rsp.reason}')
header_skip = True
with open(destination,'ab') as f:
for row in rsp.iter_lines():
if header_skip:
header_skip = False
continue
else:
f.write(row)
f.write(b'\n')
offset += limit*concurrency
last_res = r.pop()
if len([*last_res.result().iter_lines()]) <= 1:
break
logger.info('%d Bytes written to %s', os.stat(destination).st_size, destination)
return True