-
Notifications
You must be signed in to change notification settings - Fork 11
/
app-unicornhat.py
112 lines (82 loc) · 3.11 KB
/
app-unicornhat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import unicornhat as uh
import os, asyncio
from dotenv import load_dotenv
from azure.iot.device.aio import IoTHubDeviceClient, ProvisioningDeviceClient
from azure.iot.device import MethodResponse
load_dotenv()
id_scope = os.getenv('ID_SCOPE')
device_id = os.getenv('DEVICE_ID')
primary_key = os.getenv('PRIMARY_KEY')
uh.set_layout(uh.PHAT)
uh.brightness(1)
def set_colour(colour):
r = '0x' + colour[0:2]
g = '0x' + colour[2:4]
b = '0x' + colour[4:6]
r_value = int(r, 0)
g_value = int(g, 0)
b_value = int(b, 0)
print('Updating color: r =', r_value, ', g =', g_value, ', b =', b_value)
uh.set_all(r_value, g_value, b_value)
uh.show()
async def main():
# provision the device
async def register_device():
provisioning_device_client = ProvisioningDeviceClient.create_from_symmetric_key(
provisioning_host='global.azure-devices-provisioning.net',
registration_id=device_id,
id_scope=id_scope,
symmetric_key=primary_key,
)
return await provisioning_device_client.register()
results = await asyncio.gather(register_device())
registration_result = results[0]
# build the connection string
conn_str='HostName=' + registration_result.registration_state.assigned_hub + \
';DeviceId=' + device_id + \
';SharedAccessKey=' + primary_key
# The client object is used to interact with your Azure IoT Central.
device_client = IoTHubDeviceClient.create_from_connection_string(conn_str)
# connect the client.
print('Connecting')
await device_client.connect()
print('Connected')
# Get the current colour
twin = await device_client.get_twin()
print('Got twin: ', twin)
try:
desired_colour = twin['reported']['colour']
set_colour(desired_colour)
except Exception as ex:
print("Couldn't load twin,")
# listen for commands
async def command_listener(device_client):
while True:
method_request = await device_client.receive_method_request()
print('Call made to', method_request.name)
colour = method_request.payload
print('payload', colour)
if isinstance(colour, dict):
colour = colour['colour']
print('payload', colour)
set_colour(colour)
payload = {'result': True, 'data': colour}
method_response = MethodResponse.create_from_method_request(
method_request, 200, payload
)
await device_client.send_method_response(method_response)
# Write the colour back as a property
await device_client.patch_twin_reported_properties({'colour':colour})
# async loop that controls the lights
async def main_loop():
global mode
while True:
await asyncio.sleep(1)
listeners = asyncio.gather(command_listener(device_client))
await main_loop()
# Cancel listening
listeners.cancel()
# Finally, disconnect
await device_client.disconnect()
if __name__ == '__main__':
asyncio.run(main())