Skip to content

Commit

Permalink
Merge pull request #93 from somakeit/51-add-error-checking-to-smibhid…
Browse files Browse the repository at this point in the history
…-state

51 add error checking to smibhid state
  • Loading branch information
sam57719 authored May 15, 2024
2 parents f7b387e + 70dc785 commit dc4c755
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 21 deletions.
1 change: 1 addition & 0 deletions smibhid/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Press the space_open or space_closed buttons to call the smib server endpoint ap
- LED flashes while trying to set state so you know it's trying to do something
- Confirms the space state after change by calling space_state
- Regularly polls for space state (polling period configurable in config.py) and updates the SMIBHID status appropriately to sync with other space state controls
- Flashes both space state LEDs at 2Hz if space state cannot be determined

## Circuit diagram
### Pico W Connections
Expand Down
97 changes: 76 additions & 21 deletions smibhid/lib/hid.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from ulogging import uLogger
import config
from button import Button
from asyncio import Event, create_task, get_event_loop, sleep
from asyncio import Event, create_task, get_event_loop, sleep, wait_for
from utils import StatusLED
from slack_api import Wrapper
from lib.networking import WirelessNetwork
Expand All @@ -28,6 +28,9 @@ def __init__(self, loglevel: int) -> None:
self.slack_api = Wrapper(loglevel, self.wifi)
self.loop_running = False
self.space_state = None
self.space_state_check_in_error_state = False
self.checking_space_state = False
self.checking_space_state_timeout_s = 30

self.space_state_poll_frequency = config.space_state_poll_frequency_s
if self.space_state_poll_frequency != 0 and self.space_state_poll_frequency < 5:
Expand Down Expand Up @@ -61,40 +64,92 @@ def startup(self) -> None:

def set_output_space_open(self) -> None:
"""Set LED's display etc to show the space as open"""
self.space_state = True
self.space_open_led.on()
self.space_closed_led.off()
self.log.info("Space state is open.")

def set_output_space_closed(self) -> None:
"""Set LED's display etc to show the space as closed"""
self.space_state = False
self.space_open_led.off()
self.space_closed_led.on()
self.log.info("Space state is closed.")

def set_output_space_none(self) -> None:
"""Set LED's display etc to show the space as none"""
self.space_state = None
self.space_open_led.off()
self.space_closed_led.off()
self.log.info("Space state is none.")

def _set_space_state_check_to_error(self) -> None:
"""Activities relating to space_state check moving to error state"""
self.log.info("Space state check has errored.")
self.space_state_check_in_error_state = True
self.state_check_error_open_led_flash_task = create_task(self.space_open_led.async_constant_flash(2))
self.state_check_error_closed_led_flash_task = create_task(self.space_closed_led.async_constant_flash(2))

def _set_space_state_check_to_ok(self) -> None:
"""Activities relating to space_state check moving to ok state"""
self.log.info("Space state check status error has cleared")
self.space_state_check_in_error_state = False
self.state_check_error_open_led_flash_task.cancel()
self.state_check_error_closed_led_flash_task.cancel()
self.space_open_led.off()
self.space_closed_led.off()

def _free_to_check_space_state(self) -> bool:
"""Check that we're not already checking for space state"""
self.log.info("Checking space state check state")
if self.checking_space_state:
self.log.warn("Already checking space state")
return False
else:
self.log.info("Free to check space state")
self.checking_space_state = True
return True

def _set_space_output(self, new_space_state: bool | None) -> None:
"""Call appropriate space output configuration method for new space state."""
if new_space_state is OPEN:
self.set_output_space_open()
elif new_space_state is CLOSED:
self.set_output_space_closed()
elif new_space_state is None:
self.set_output_space_none()
else:
raise ValueError("Space state is not an expected value")

async def async_update_space_state_output(self) -> None:
"""Checks space state from server and sets SMIDHID output to reflect current space state, including errors if space state not available."""
try:
space_state = await self.slack_api.async_get_space_state()
self.log.info(f"Space state is: {space_state}")
if space_state != self.space_state:
self.space_state = space_state
if space_state is OPEN:
self.set_output_space_open()
elif space_state is CLOSED:
self.set_output_space_closed()
elif space_state is None:
self.set_output_space_none()
else:
raise ValueError("Space state is not an expected value")
except Exception as e:
self.log.error(f"Error encountered polling updating space state: {e}")
raise
"""
Checks space state from server and sets SMIDHID output to reflect current space state, including errors if space state not available.
"""
self.log.info("Checking space state")
if not self._free_to_check_space_state():
return
else:
try:
self.log.info("Checking space status from server")
new_space_state = await wait_for(self.slack_api.async_get_space_state(), self.checking_space_state_timeout_s)
self.log.info(f"Space state is: {new_space_state}")
if new_space_state != self.space_state:
self.log.info("Space state changed")
self._set_space_output(new_space_state)

if self.space_state_check_in_error_state:
self.log.info("Space state unchanged")
self._set_space_state_check_to_ok()

except Exception as e:
self.log.error(f"Error encountered updating space state: {e}")
if not self.space_state_check_in_error_state:
self._set_space_state_check_to_error()
raise

finally:
self.log.info("Setting checking_space_state to False")
self.checking_space_state = False

async def async_space_opened_watcher(self) -> None:
"""
Expand All @@ -108,7 +163,7 @@ async def async_space_opened_watcher(self) -> None:
await self.slack_api.async_space_open()
flash_task.cancel()
self.set_output_space_open()
await self.async_update_space_state_output()
create_task(self.async_update_space_state_output())
except Exception as e:
self.log.error(f"An exception was encountered trying to set SMIB space state: {e}")
flash_task.cancel()
Expand All @@ -126,7 +181,7 @@ async def async_space_closed_watcher(self) -> None:
await self.slack_api.async_space_closed()
flash_task.cancel()
self.set_output_space_closed()
await self.async_update_space_state_output()
create_task(self.async_update_space_state_output())
except Exception as e:
self.log.error(f"An exception was encountered trying to set SMIB space state: {e}")
flash_task.cancel()
Expand All @@ -139,7 +194,7 @@ async def async_space_state_watcher(self) -> None:
while True:
self.log.info("Polling space state")
try:
await self.async_update_space_state_output()
create_task(self.async_update_space_state_output())
except Exception as e:
self.log.error(f"State poller encountered an error updating space state: {e}")
finally:
Expand Down

0 comments on commit dc4c755

Please sign in to comment.