-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path_stations.py
279 lines (239 loc) · 8.76 KB
/
_stations.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
"""
Station utility functions.
"""
import meerschaum as mrsm
from meerschaum.utils.typing import Dict, List, Any, Optional
STATIONS_BASE_URL: str = "https://api.weather.gov/stations"
_stations_info_cache: Dict[str, Any] = {}
def get_station_info(stationID: str) -> Dict[str, Any]:
"""
Fetch the metadata for a station.
"""
from meerschaum.utils.warnings import warn
import requests
station_info = _stations_info_cache.get(stationID, {})
if station_info:
return station_info
url = STATIONS_BASE_URL + "/" + stationID
response = requests.get(url)
if not response:
warn(
f"Unable to get information for station '{stationID}':\n{response.text}",
stack = False,
)
return station_info
info = response.json()
try:
geo = info['geometry']
except Exception as e:
geo = None
try:
name = info['properties']['name'].rstrip()
except Exception as e:
warn(f"Unable to fetch the name for station '{stationID}'.", stack=False)
return station_info
station_info['name'] = name
if geo is not None:
station_info['geometry'] = geo
_stations_info_cache[stationID] = station_info
return station_info
def ask_for_stations(pipe, debug: bool = False) -> Dict[str, Any]:
"""
Prompt the user for stations and return a dictionary.
"""
import requests, json, re
from meerschaum.utils.warnings import warn, info
from meerschaum.utils.prompt import yes_no, prompt
from meerschaum.utils.formatting import pprint
instructions = f"""
Visit https://www.weather.gov and use the local forecast search tool
on the top left to find specific station IDs (e.g. 'KATL' for Atanta).
To fetch all stations from a state, enter the state abbreviation
(e.g. 'GA' for Georgia).
"""
info(instructions)
stations = {}
while True:
stationID = prompt("Enter station ID or state abbreviation, empty to stop: ", icon=False)
if stationID == '':
break
if len(stationID) == 2:
state_abbrev = stationID
if yes_no(
f"Are you sure you want to fetch from all stations in the state '{state_abbrev}'? " +
"This will be very slow!"
):
stations = get_state_stations(state_abbrev)
break
url = STATIONS_BASE_URL + "/" + stationID
response = requests.get(url)
if not response:
warn(
f"Unable to get information for station '{stationID}':\n{response.text}",
stack = False,
)
continue
info = response.json()
try:
geo = info['geometry']
except:
geo = None
try:
name = info['properties']['name'].rstrip()
except:
warn(f"Unable to fetch name for station '{stationID}'. Skipping...", stack=False)
continue
if not yes_no(f"Is '{name}' a good label for station '{stationID}'?"):
name = prompt(f"New label for station '{stationID}': ", icon=False)
stations[stationID] = {}
stations[stationID]['name'] = name
if geo is not None:
stations[stationID]['geometry'] = geo
pprint(stations)
if not yes_no(f"Would you like to register the above stations to pipe '{pipe}'?"):
print("Resetting stations and starting over...")
pipe.parameters['noaa']['stations'] = {}
return ask_for_stations(pipe, debug=debug)
return stations
def get_stations(pipe: 'mrsm.Pipe') -> Dict[str, Any]:
"""
Return the stations dictionary.
"""
edit = False
stations_dict = pipe.parameters.get('noaa', {}).get('stations', {})
if isinstance(stations_dict, list):
stations_dict = {stationID: {} for stationID in stations_dict}
for stationID, station_info in {k: v for k, v in stations_dict.items()}.items():
if 'name' not in station_info:
stations_dict[stationID] = get_station_info(stationID)
edit = True
if edit:
pipe.parameters['noaa']['stations'] = stations_dict
pipe.edit()
try:
return pipe.parameters['noaa']['stations']
except Exception as e:
return {}
def get_state_stations(
state_abbrev: str,
debug: bool = False
) -> dict:
"""
Parse every station in a state
"""
from meerschaum.utils.warnings import warn
import requests, json
url = "https://api.weather.gov/stations"
stations = {}
print(f"Retrieving stations for state '{state_abbrev}'...")
d = json.loads(requests.get(url, params={'state' : state_abbrev}).text)
if 'features' not in d:
warn(f"No stations retrieved for state '{state_abbrev}'.", stack=False)
return stations
for f in d['features']:
stationID = None
try:
stationID = f['id'].split('/stations/')[-1]
geo = f.get('geometry', None)
name = f['properties']['name'].lstrip().rstrip()
except:
if stationID is not None:
warn(f"Could not determine name for station '{stationID}'. Skipping...")
continue
stations[stationID] = dict()
stations[stationID]['name'] = name
stations[stationID]['geometry'] = geo
return stations
def fetch_station_data(
stationID: str,
begin: Optional['datetime.datetime'] = None,
end: Optional['datetime.datetime'] = None,
) -> Optional[Dict[str, List[Any]]]:
"""
Fetch JSON for a given stationID from NOAA and parse into a dataframe
"""
from meerschaum.utils.packages import import_pandas
from meerschaum.utils.misc import parse_df_datetimes
from meerschaum.utils.warnings import warn
import json, pytz, datetime, requests
pd = import_pandas()
### Get the latest sync time for this station so we don't request duplicate data.
start = (
begin.replace(tzinfo=pytz.timezone('UTC')).isoformat()
if begin is not None
else None
)
end = (
end.replace(tzinfo=pytz.timezone('UTC')).isoformat()
if end is not None
else None
)
info_dict = get_station_info(stationID)
print(
(f"{start} - {end}\n" if start else '')
+ f"Fetching data for station '{stationID}' ({info_dict['name']})..."
)
url = f"https://api.weather.gov/stations/{stationID}/observations/"
response = None
try:
response = requests.get(url, params={"start":start, "end": end})
data = json.loads(response.text)
except Exception as e:
print(f"\nFailed to parse JSON with exception: {e}", flush=True)
if response is not None:
print("Received text:\n" + response.text)
return None
print(f"Done fetching data for station '{stationID}' ({info_dict['name']}).", flush=True)
### build a dictionary from the JSON response.
d = {'location': [], 'geometry': [], 'cloudLayers': [] }
if 'features' not in data:
warn(
f"Failed to fetch data for station '{stationID}' ({info_dict['name']}):\n" + str(data),
stack = False
)
return None
for record in data['features']:
d['location'].append(info_dict.get('name', None))
d['geometry'].append(info_dict.get('geometry', {}))
for col, v in record.get('properties', {}).items():
if col.startswith('@'):
continue
if col == 'timestamp':
val = v
### We could just use the stationID provided, but it's given in the JSON
### so we might as well use it.
elif col == 'station':
val = v.split('/')[-1]
elif isinstance(v, dict) and 'value' in v:
val = v['value']
else:
val = v
### If possible, append units to column name.
if isinstance(v, dict) and 'unitCode' in v:
col += " (" + v['unitCode'].replace('wmoUnit:', '') + ")"
if col == 'cloudLayers' and val is None:
val = []
### Grow the lists in the dictionary.
### E.g. { 'col1' : [ 1, 2, 3 ], 'col2' : [ 4, 5, 6 ] }
if col not in d:
d[col] = []
d[col].append(val)
### Normalize the lengths.
klens, lens = {}, {}
for k, v in d.items():
klens[k] = len(v)
for k, l in klens.items():
if l not in lens:
lens[l] = 0
lens[l] += 1
max_l, max_c = 0, 0
for l, c in lens.items():
if c > max_c:
max_c = c
max_l = l
norm_keys = [k for k, l in klens.items() if l == max_l]
norm_d = {k: d[k] for k in norm_keys}
return norm_d