-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
326 lines (269 loc) · 10.2 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
import json
import math
import platform
import threading
from typing import List, Dict
from kivy.clock import mainthread, Clock
from kivy.core.image import Image
from kivy.loader import Loader
from kivy.uix.screenmanager import Screen
from kivymd.app import MDApp
from kivymd.uix.boxlayout import BoxLayout
from kivymd.uix.card import MDCard
from kivymd.uix.screen import MDScreen
from kivymd.uix.list.list import MDListItem
from kivy.properties import StringProperty, ListProperty, NumericProperty
import models
from utils.mqtt import MQTT
from kivy.properties import StringProperty
from kivy.lang import Builder
from kivy.core.window import Window
from kivy.uix.widget import Widget
import database
import models
import rfid_reader
from enum import Enum
RUNNING_ON_TARGET = False # Store if this is running on raspberry pi
# Check if this is running on Raspberry Pi
if platform.system() == "Linux" and "rpi" in platform.uname().release:
RUNNING_ON_TARGET = True
MOCK_ITEM = models.Item(1, "Swedish Fish", 11111, 3.19, 5, 266, 26, 'http://placehold.jp/150x150.png', 'pouch')
# HARDCODED SHELF DATA
# Shelf Address -> Slot number (list) -> Tuple (Item ID in database, division factor for weight)
SHELF_DATA = {
'80:65:99:E3:8B:92': [(1, 0.26), (1, 0.26), (1, 0.26), (1, 0.26)],
'id2': [(1, 0.26), (1, 0.26), (1, 0.26), (1, 0.26)],
'id3': [(1, 0.26), (1, 0.26), (1, 0.26), (1, 0.26)],
'id4': [(1, 0.26), (1, 0.26), (1, 0.26), (1, 0.26)]
}
class InfoScreen(MDScreen):
pass
class CartScreen(Screen):
cart_items = ListProperty([])
def _refresh_cart(self):
cart_view = self.children[0].children[1].children[0]
cart_view.data = [{'title': item['title'], 'source': item['source'], 'price': item['price'], 'quantity': item['quantity']} for item in self.cart_items]
cart_view.refresh_from_data()
def add_item(self, item: models.Item, quantity: int):
quantity = abs(quantity)
print(f"NEW QUANTITY {quantity}")
"""
Add an item to the
:param item:
:param quantity:
:return:
"""
# Check if this item is already in the cart
for i in range(len(self.cart_items)):
if self.cart_items[i]['title'] == item.name:
# Item is already in the cart
self.cart_items[i]['quantity'] = str(int(self.cart_items[i]['quantity']) + quantity)
self._refresh_cart()
return
item_to_add = {'title': str(item.name), 'source': str(item.thumbnail_url), 'price': str(item.price), 'quantity': str(quantity)}
self.cart_items.insert(0, item_to_add)
self._refresh_cart()
def remove_item(self, item: models.Item, quantity_to_remove: int):
"""
Remove an item from the cart
:param item: Item to remove
:param quantity_to_remove: Quantity to remove
:return:
"""
quantity_to_remove = abs(quantity_to_remove)
for i in range(len(self.cart_items)):
if self.cart_items[i]['title'] == item.name:
# This item is the one to remove
new_quantity = int(self.cart_items[i]['quantity']) - quantity_to_remove
if new_quantity <= 0:
# Remove this item completely
del self.cart_items[i]
else:
# Adjust the quantity of this item
self.cart_items[i]['quantity'] = str(new_quantity)
# Refresh view
self._refresh_cart()
return
def empty_cart(self):
"""
Empty the cart
:return:
"""
self.cart_items.clear()
self._refresh_cart()
class StartScreen(MDScreen):
pass
class AttractScreen(MDScreen):
pass
class DebugScreen(MDScreen):
def open_door(self):
print("Opening door")
pass
class CancelScreen(MDScreen):
pass
class ThankYouScreen(MDScreen):
pass
class CartItem(MDListItem):
title = StringProperty()
source = StringProperty()
price = StringProperty()
quantity = StringProperty()
class ImageButton(Widget):
source = StringProperty()
text = StringProperty()
class ShelfItem(Widget):
shelf = NumericProperty()
slot = NumericProperty()
text = StringProperty()
def on_shelf(self, instance, value):
self.text = f"Shelf: {self.shelf} Slot: {self.slot}"
def on_slot(self, instance, value):
self.text = f"Shelf: {self.shelf} Slot: {self.slot}"
def calibrate(self):
pass
class MemberCard(MDCard):
name = StringProperty()
job_title = StringProperty()
major = StringProperty()
class States(Enum):
WAITING_FOR_USER_TOKEN = 1
CART_DOOR_OPEN = 2
THANK_YOU = 3
CANCEL_WAIT_FOR_DOOR_CLOSE = 4
DEBUG = 5
ABOUT = 6
class MainApp(MDApp):
connected_shelves: Dict[str, models.Shelf]
current_user: models.User | None = None
state: States
cart_screen: CartScreen | None
def __init__(self, **kwargs):
super().__init__()
self.mqtt_client = None
self.connected_shelves = dict()
self.state = States.WAITING_FOR_USER_TOKEN
self.cart_screen = None
def build(self):
Window.size = (800,480)
if RUNNING_ON_TARGET:
Window.show_cursor = False
# Set default loading image
Loader.loading_image = Image('./images/item_placeholder.png')
self.root = BoxLayout()
self.root.add_widget(Builder.load_file('app.kv'))
self.cart_screen: CartScreen = self.root.children[0].screens[1]
self.root.children[0].current = 'Start'
self.state = States.WAITING_FOR_USER_TOKEN
self.mqtt_client = MQTT()
self.mqtt_client.start_listening()
self.mqtt_client.set_rfid_user_callback(self.user_tap_callback)
self.mqtt_client.set_shelf_data_callback(self.shelf_data_callback)
self.mqtt_client.set_door_closed_callback(self.door_closed_callback)
# Read in all shelves
for shelf_id in SHELF_DATA:
items = list()
conversion_factors = list()
for item, conversion_factor in SHELF_DATA[shelf_id]:
items.append(MOCK_ITEM)
conversion_factors.append(conversion_factor)
shelf = models.Shelf(items)
for i in range(len(shelf.slots)):
shelf.slots[i].set_conversion_factor(conversion_factors[i])
self.connected_shelves[shelf_id] = shelf
@mainthread
def user_tap_callback(self, user_txt):
if user_txt[:5] != "User[":
return
user_split = user_txt[5:].split(',')
user = models.User(
user_split[0],
user_split[1],
user_split[2],
user_split[3][0:],
user_split[4],
user_split[5],
user_split[6]
)
if self.state == States.WAITING_FOR_USER_TOKEN:
if 'admin' in user.name:
# Admin user
self.root.children[0].transition.direction = 'right'
self.root.children[0].current = 'Debug'
else:
self.current_user = user
self.open_cart_screen()
@mainthread
def door_closed_callback(self, message):
"""
Callback for when door closed message is received over MQTT
:return:
"""
if self.state == States.CANCEL_WAIT_FOR_DOOR_CLOSE:
# Cancelled transaction waiting on door to close. Go to start screen
self.go_to_start_screen()
elif self.state == States.CART_DOOR_OPEN:
# Active transaction signal end. Go to thank you screen, then go to cart screen
self.root.children[0].transition.direction = 'left'
self.root.children[0].current = 'ThankYou'
Clock.schedule_once(self.go_to_start_screen(), 3)
def go_to_start_screen(self):
self.root.children[0].transition.direction = 'right'
self.root.children[0].current = 'Start'
self.current_user = None
def open_cart_screen(self):
"""
Open the cart screen
:return:
"""
self.cart_screen.empty_cart()
self.root.children[0].transition.direction = 'left'
self.root.children[0].current = 'Cart'
self.mqtt_client.publish_doors_open()
self.state = States.CART_DOOR_OPEN
def open_door(self):
self.mqtt_client.publish_doors_open()
def cancel_transaction(self):
"""
Cancel transaction
:return:
"""
self.state = States.CANCEL_WAIT_FOR_DOOR_CLOSE
self.root.children[0].transition.direction = 'left'
self.root.children[0].current = 'Cancel'
@mainthread
def shelf_data_callback(self, data_string):
"""
Callback function for when data is received for a shelf via MQTT
:param data_string: The data string, directly from MQTT
:return:
"""
#try:
# Deconstruct JSON data
data = json.loads(data_string)
print(data)
shelf_id = data['id']
slot_values = data['data']
send_time_millis = data['time']
# Check that slot values are a list
if not isinstance(slot_values, list):
raise Exception("IncorrectFormat: Slot values incorrect format (not list)")
if shelf_id in self.connected_shelves:
adjustments = self.connected_shelves[shelf_id].update(slot_values)
for item, quantity_adjust in adjustments:
if quantity_adjust < 0:
self.cart_screen.add_item(item, quantity_adjust)
print("ADD ITEM TO CART")
elif quantity_adjust > 0:
self.cart_screen.remove_item(item, quantity_adjust)
print("REMOVE ITEM FROM CART")
# except KeyError as key_error:
# print("KeyError when parsing shelf data from MQTT")
# print(f"\tData: '{data_string}'")
# print(f"\tFull exception: {key_error}")
# except Exception as e:
# print("Exception occurred when reading shelf data from MQTT")
# print(f"Full exception: {e}")
def stop(self, *largs):
self.mqtt_client.stop_listening()
if __name__ == "__main__":
MainApp().run()