Skip to content

Commit

Permalink
Merge pull request #655 from kdmukai/hardwarebutton_refactor
Browse files Browse the repository at this point in the history
[Refactor / bugfix] Simplify `HardwareButtons`
  • Loading branch information
newtonick authored Jan 13, 2025
2 parents 24d688a + e972fc8 commit 825a25a
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 158 deletions.
33 changes: 16 additions & 17 deletions src/seedsigner/gui/screens/screen.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import time

from dataclasses import dataclass, field
Expand All @@ -15,6 +16,8 @@
from seedsigner.models.settings import SettingsConstants
from seedsigner.models.threads import BaseThread, ThreadsafeCounter

logger = logging.getLogger(__name__)


# Must be huge numbers to avoid conflicting with the selected_button returned by the
# screens with buttons.
Expand Down Expand Up @@ -71,6 +74,13 @@ def display(self) -> Any:
for t in self.get_threads():
t.stop()

for t in self.get_threads():
# Wait for each thread to stop; equivalent to `join()` but gracefully
# handles threads that were never run (necessary for screenshot generator
# compatibility, perhaps other edge cases).
while t.is_alive():
time.sleep(0.01)


def clear_screen(self):
# Clear the whole canvas
Expand Down Expand Up @@ -226,11 +236,7 @@ def _run(self):
time.sleep(0.1)
continue

user_input = self.hw_inputs.wait_for(
HardwareButtonsConstants.ALL_KEYS,
check_release=True,
release_keys=HardwareButtonsConstants.KEYS__ANYCLICK
)
user_input = self.hw_inputs.wait_for(HardwareButtonsConstants.ALL_KEYS)

with self.renderer.lock:
if not self.top_nav.is_selected and user_input in [
Expand Down Expand Up @@ -447,6 +453,7 @@ def _run(self):
while True:
ret = self._run_callback()
if ret is not None:
logging.info("Exiting ButtonListScreen due to _run_callback")
return ret

user_input = self.hw_inputs.wait_for(
Expand All @@ -455,9 +462,7 @@ def _run(self):
HardwareButtonsConstants.KEY_DOWN,
HardwareButtonsConstants.KEY_LEFT,
HardwareButtonsConstants.KEY_RIGHT,
] + HardwareButtonsConstants.KEYS__ANYCLICK,
check_release=True,
release_keys=HardwareButtonsConstants.KEYS__ANYCLICK
] + HardwareButtonsConstants.KEYS__ANYCLICK
)

with self.renderer.lock:
Expand Down Expand Up @@ -641,9 +646,7 @@ def swap_selected_button(new_selected_button: int):
HardwareButtonsConstants.KEY_DOWN,
HardwareButtonsConstants.KEY_LEFT,
HardwareButtonsConstants.KEY_RIGHT
] + HardwareButtonsConstants.KEYS__ANYCLICK,
check_release=True,
release_keys=HardwareButtonsConstants.KEYS__ANYCLICK
] + HardwareButtonsConstants.KEYS__ANYCLICK
)

with self.renderer.lock:
Expand Down Expand Up @@ -858,9 +861,7 @@ def _run(self):
HardwareButtonsConstants.KEY_DOWN,
HardwareButtonsConstants.KEY_LEFT,
HardwareButtonsConstants.KEY_RIGHT,
] + HardwareButtonsConstants.KEYS__ANYCLICK,
check_release=True,
release_keys=HardwareButtonsConstants.KEYS__ANYCLICK
] + HardwareButtonsConstants.KEYS__ANYCLICK
)
if user_input == HardwareButtonsConstants.KEY_DOWN:
# Reduce QR code background brightness
Expand Down Expand Up @@ -1192,9 +1193,7 @@ def _run(self):
# Start the interactive update loop
while True:
input = self.hw_inputs.wait_for(
HardwareButtonsConstants.KEYS__LEFT_RIGHT_UP_DOWN + [HardwareButtonsConstants.KEY_PRESS, HardwareButtonsConstants.KEY3],
check_release=True,
release_keys=[HardwareButtonsConstants.KEY_PRESS, HardwareButtonsConstants.KEY3]
HardwareButtonsConstants.KEYS__LEFT_RIGHT_UP_DOWN + [HardwareButtonsConstants.KEY_PRESS, HardwareButtonsConstants.KEY3]
)

with self.renderer.lock:
Expand Down
35 changes: 15 additions & 20 deletions src/seedsigner/gui/screens/seed_screens.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,7 @@ def _render(self):

def _run(self):
while True:
input = self.hw_inputs.wait_for(
HardwareButtonsConstants.ALL_KEYS,
check_release=True,
release_keys=[HardwareButtonsConstants.KEY_PRESS, HardwareButtonsConstants.KEY2]
)
input = self.hw_inputs.wait_for(HardwareButtonsConstants.ALL_KEYS)

with self.renderer.lock:
if self.is_input_in_top_nav:
Expand Down Expand Up @@ -885,11 +881,7 @@ def _run(self):

# Start the interactive update loop
while True:
input = self.hw_inputs.wait_for(
HardwareButtonsConstants.ALL_KEYS,
check_release=True,
release_keys=[HardwareButtonsConstants.KEY_PRESS, HardwareButtonsConstants.KEY1, HardwareButtonsConstants.KEY2, HardwareButtonsConstants.KEY3]
)
input = self.hw_inputs.wait_for(HardwareButtonsConstants.ALL_KEYS)

keyboard_swap = False

Expand Down Expand Up @@ -1466,13 +1458,13 @@ def __post_init__(self):


def _run_callback(self):
# Exit the screen on success via a non-None value
logger.info(f"verified_index: {self.verified_index.cur_count}")
# Exit the screen on success via a non-None value.
# see: ButtonListScreen._run()
if self.verified_index.cur_count is not None:
logger.info("Screen callback returning success!")
self.threads[-1].stop()
while self.threads[-1].is_alive():
time.sleep(0.01)
# Note that the ProgressThread will have already exited on its own.

# Return a success value (anything other than None) to end the
# ButtonListScreen._run() loop.
return 1


Expand All @@ -1489,10 +1481,13 @@ def run(self):
while self.keep_running:
if self.verified_index.cur_count is not None:
# This thread will detect the success state while its parent Screen
# holds in its `wait_for`. Have to trigger a hw_input event to break
# the Screen._run out of the `wait_for` state. The Screen will then
# call its `_run_callback` and detect the success state and exit.
HardwareButtons.get_instance().trigger_override(force_release=True)
# blocks in its `wait_for`. Have to trigger a hw_input override event
# to break the Screen._run out of the `wait_for` state. The Screen
# will then call its `_run_callback` and detect the success state and
# exit.
HardwareButtons.get_instance().trigger_override()

# Exit the loop and thereby end this thread
return

textarea = TextArea(
Expand Down
2 changes: 1 addition & 1 deletion src/seedsigner/gui/screens/settings_screens.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def _run(self):
screen_y=int((self.canvas_height - msg_height)/ 2),
)
while True:
input = self.hw_inputs.wait_for(keys=HardwareButtonsConstants.ALL_KEYS, check_release=False)
input = self.hw_inputs.wait_for(keys=HardwareButtonsConstants.ALL_KEYS)

if input == HardwareButtonsConstants.KEY1:
# Note that there are three distinct screen updates that happen at
Expand Down
119 changes: 50 additions & 69 deletions src/seedsigner/hardware/buttons.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class HardwareButtons(Singleton):
KEY2_PIN = 12
KEY3_PIN = 8


@classmethod
def get_instance(cls):
# This is the only way to access the one and only instance
Expand All @@ -53,8 +54,6 @@ def get_instance(cls):
cls._instance.GPIO = GPIO
cls._instance.override_ind = False

cls._instance.add_events([HardwareButtonsConstants.KEY_UP, HardwareButtonsConstants.KEY_DOWN, HardwareButtonsConstants.KEY_PRESS, HardwareButtonsConstants.KEY_LEFT, HardwareButtonsConstants.KEY_RIGHT, HardwareButtonsConstants.KEY1, HardwareButtonsConstants.KEY2, HardwareButtonsConstants.KEY3])

# Track state over time so we can apply input delays/ignores as needed
cls._instance.cur_input = None # Track which direction or button was last pressed
cls._instance.cur_input_started = None # Track when that input began
Expand All @@ -65,25 +64,30 @@ def get_instance(cls):
return cls._instance



@classmethod
def get_instance_no_hardware(cls):
# This is the only way to access the one and only instance
if cls._instance is None:
cls._instance = cls.__new__(cls)


def wait_for(self, keys=[]) -> int:
"""
Block execution until one of the target keys is pressed.
def wait_for(self, keys=[], check_release=True, release_keys=[]) -> int:
Optionally override the wait by calling `trigger_override()`.
"""
# TODO: Refactor to keep control in the Controller and not here
from seedsigner.controller import Controller
controller = Controller.get_instance()

if not release_keys:
release_keys = keys
self.override_ind = False

while True:
if self.override_ind:
# Break out of the wait_for without waiting for user input
self.override_ind = False
return HardwareButtonsConstants.OVERRIDE

cur_time = int(time.time() * 1000)
if cur_time - self.last_input_time > controller.screensaver_activation_ms and not controller.is_screensaver_running:
# Start the screensaver. Will block execution until input detected.
Expand All @@ -99,48 +103,42 @@ def wait_for(self, keys=[], check_release=True, release_keys=[]) -> int:
# Resume from a fresh loop
continue

# Check each candidate key to see if it was pressed
for key in keys:
if not check_release or ((check_release and key in release_keys and HardwareButtonsConstants.release_lock) or check_release and key not in release_keys):
# when check release is False or the release lock is released (True)
if self.GPIO.input(key) == GPIO.LOW or self.override_ind:
HardwareButtonsConstants.release_lock = False
if self.override_ind:
self.override_ind = False
return HardwareButtonsConstants.OVERRIDE

if self.cur_input != key:
self.cur_input = key
self.cur_input_started = int(time.time() * 1000) # in milliseconds
self.last_input_time = self.cur_input_started
if self.GPIO.input(key) == GPIO.LOW:
if self.cur_input != key:
self.cur_input = key
self.cur_input_started = int(time.time() * 1000) # in milliseconds
self.last_input_time = self.cur_input_started
return key

else:
# Still pressing the same input
if cur_time - self.last_input_time > self.next_repeat_threshold:
# Too much time has elapsed to consider this the same
# continuous input. Treat as a new separate press.
self.cur_input_started = cur_time
self.last_input_time = cur_time
return key

elif cur_time - self.cur_input_started > self.first_repeat_threshold:
# We're good to relay this immediately as continuous
# input.
self.last_input_time = cur_time
return key

else:
# Still pressing the same input
if cur_time - self.last_input_time > self.next_repeat_threshold:
# Too much time has elapsed to consider this the same
# continuous input. Treat as a new separate press.
self.cur_input_started = cur_time
self.last_input_time = cur_time
return key

elif cur_time - self.cur_input_started > self.first_repeat_threshold:
# We're good to relay this immediately as continuous
# input.
self.last_input_time = cur_time
return key

else:
# We're not yet at the first repeat threshold; triggering
# a key now would be too soon and yields a bad user
# experience when only a single click was intended but
# a second input is processed because of race condition
# against human response time to release the button.
# So there has to be a delay before we allow the first
# continuous repeat to register. So we'll ignore this
# round's input and **won't update any of our
# timekeeping vars**. But once we cross the threshold,
# we let the repeats fly.
pass
# We're not yet at the first repeat threshold; triggering
# a key now would be too soon and yields a bad user
# experience when only a single click was intended but
# a second input is processed because of race condition
# against human response time to release the button.
# So there has to be a delay before we allow the first
# continuous repeat to register. So we'll ignore this
# round's input and **won't update any of our
# timekeeping vars**. But once we cross the threshold,
# we let the repeats fly.
pass

time.sleep(0.01) # wait 10 ms to give CPU chance to do other things

Expand All @@ -149,29 +147,13 @@ def update_last_input_time(self):
self.last_input_time = int(time.time() * 1000)


def add_events(self, keys=[]):
for key in keys:
GPIO.add_event_detect(key, self.GPIO.RISING, callback=HardwareButtons.rising_callback)

def trigger_override(self) -> bool:
""" Set the override flag to break out of the current `wait_for` loop """
self.override_ind = True

def rising_callback(channel):
HardwareButtonsConstants.release_lock = True


def trigger_override(self, force_release = False) -> bool:
if force_release:
HardwareButtonsConstants.release_lock = True

if not self.override_ind:
self.override_ind = True
return True
return False

def force_release(self) -> bool:
HardwareButtonsConstants.release_lock = True
return True

def check_for_low(self, key: int = None, keys: List[int] = None) -> bool:
""" Returns True if one of the target keys/key is pressed """
if key:
keys = [key]
for key in keys:
Expand All @@ -181,15 +163,16 @@ def check_for_low(self, key: int = None, keys: List[int] = None) -> bool:
else:
return False


def has_any_input(self) -> bool:
""" Returns True if any of the keys are pressed """
for key in HardwareButtonsConstants.ALL_KEYS:
if self.GPIO.input(key) == GPIO.LOW:
return True
return False


# class used as short hand for static button/channel lookup values
# TODO: Implement `release_lock` functionality as a global somewhere. Mixes up design
# patterns to have a static constants class plus a settable global value.
class HardwareButtonsConstants:
if GPIO.RPI_INFO['P1_REVISION'] == 3: #This indicates that we have revision 3 GPIO
KEY_UP = 31
Expand Down Expand Up @@ -227,5 +210,3 @@ class HardwareButtonsConstants:

KEYS__LEFT_RIGHT_UP_DOWN = [KEY_LEFT, KEY_RIGHT, KEY_UP, KEY_DOWN]
KEYS__ANYCLICK = [KEY_PRESS, KEY1, KEY2, KEY3]

release_lock = True # released when True, locked when False
Loading

0 comments on commit 825a25a

Please sign in to comment.