-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprovider.py
83 lines (71 loc) · 3.24 KB
/
provider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import logging
from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum
import requests
class ProviderType(Enum):
WUNDERGROUD="wunderground"
OPENWEATHER="openweather"
class WeatherProvider(ABC):
def __init__(self, api_key, units):
self.api_key = api_key
self.conditions = {"valid": False}
self.units = units
@abstractmethod
def get_weather(self, latitude, longitude):
pass
class WundergroundProvider(WeatherProvider):
def __init__(self, api_key, units):
if units == "metric":
units = "m"
elif units == "imperial":
units = "e"
super().__init__(api_key, units)
self.base_url = "https://api.weather.com"
def get_weather(self, latitude, longitude):
if self.api_key is None:
pass
try:
response = requests.get(f"{self.base_url}/v3/location/near?geocode={latitude},{longitude}&product=pws&format=json&apiKey={self.api_key}")
if response.status_code == 200:
data = response.json()
city = data["location"]["stationName"][0]
pws = data["location"]["stationId"][0]
self.conditions['city'] = city
response = requests.get(f"{self.base_url}/v2/pws/observations/current?stationId={pws}&format=json&units={self.units}&apiKey={self.api_key}")
if response.status_code == 200:
data = response.json()
self.conditions['temperature'] = data["observations"][0]["metric"]["temp"]
self.conditions['humidity'] = data["observations"][0]["humidity"]
self.conditions['last_update'] = datetime.now()
self.conditions['valid'] = True
else:
logging.debug("Failed to get weather data: status code is %s" % response.status_code)
self.conditions['valid'] = False
except Exception:
logging.exception("Failed to get weather data")
self.conditions['valid'] = False
class OpenweatherProvider(WeatherProvider):
def __init__(self, api_key, units):
super().__init__(api_key, units)
self.base_url = "https://api.openweathermap.org/data/2.5/weather"
def get_weather(self, latitude, longitude):
if self.api_key is None:
pass
try:
response = requests.get(f"{self.base_url}?lat={latitude}&lon={longitude}&units={self.units}&appid={self.api_key}")
if response.status_code == 200:
data = response.json()
city = data["name"]
self.conditions['city'] = city
self.conditions['temperature'] = data["main"]["temp"]
self.conditions['humidity'] = data["main"]["humidity"]
self.conditions['last_update'] = datetime.now()
self.conditions['valid'] = True
else:
logging.debug("Failed to get weather data: status code is %s" % response.status_code)
self.conditions['valid'] = False
self.conditions['valid'] = False
except Exception:
logging.exception("Failed to get weather data")
self.conditions['valid'] = False