-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathReportState.py
59 lines (46 loc) · 1.87 KB
/
ReportState.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import time
import json
from flask import current_app
from google.auth import crypt, jwt
import requests
def generate_jwt(service_account):
signer = crypt.RSASigner.from_string(service_account['private_key'])
now = int(time.time())
payload = {
'iat': now,
'exp': now + 3600,
'aud': 'https://accounts.google.com/o/oauth2/token',
'iss': service_account['client_email'],
'scope': 'https://www.googleapis.com/auth/homegraph'
}
return jwt.encode(signer, payload)
def get_access_token(signed_jwt):
url = 'https://accounts.google.com/o/oauth2/token'
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
data = 'grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer&assertion=' + signed_jwt
response = requests.post(url, headers=headers, data=data)
if response.status_code == requests.codes.ok:
token_data = json.loads(response.text)
return token_data['access_token']
response.raise_for_status()
return 'ERROR'
def report_state(access_token, report_state_file):
url = 'https://homegraph.googleapis.com/v1/devices:reportStateAndNotification'
headers = {
'X-GFE-SSL': 'yes',
'Authorization': 'Bearer ' + access_token
}
data = report_state_file
response = requests.post(url, headers=headers, json=data)
print('Response: ' + response.text)
return response.status_code == requests.codes.ok
def main(report_state_file):
service_account = current_app.config['SERVICE_ACCOUNT_DATA']
print('By ReportState')
signed_jwt = generate_jwt(service_account).decode("utf-8") # Decode
access_token = get_access_token(signed_jwt)
success = report_state(access_token, report_state_file)
if success:
print('Report State has been done successfully.')
else:
print('Report State failed. Please check the log above.')