Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
Add Popup windows for misc info and hotkeys
Browse files Browse the repository at this point in the history
Update unit tests for MCB and Interface with magic mocks
Update parser unit tests to remove reference to ASSET_DIR
  • Loading branch information
dmitri-mcguckin committed Feb 18, 2021
1 parent ab3a5ea commit dc1cacf
Show file tree
Hide file tree
Showing 17 changed files with 292 additions and 129 deletions.
23 changes: 21 additions & 2 deletions canopen_monitor/__main__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
from . import CONFIG_DIR, CACHE_DIR
import argparse
from . import APP_NAME, APP_DESCRIPTION, CONFIG_DIR, CACHE_DIR
from .app import App
from .can import MagicCANBus, MessageTable
from .parse import CANOpenParser, load_eds_file
Expand All @@ -20,13 +21,31 @@ def load_eds_files(filepath: str = CACHE_DIR) -> dict:


def main():
parser = argparse.ArgumentParser(prog=APP_NAME,
description=APP_DESCRIPTION,
allow_abbrev=False)
parser.add_argument('-i', '--interface',
dest='interfaces',
type=str,
nargs='+',
default=['vcan0'],
help='A list of interfaces to bind to.')
parser.add_argument('--no-block',
dest='no_block',
action='store_true',
default=False,
help='Disable block-waiting for the Magic CAN Bus.'
' (Warning, this may produce undefined'
' behavior).')
args = parser.parse_args()

try:
init_dirs()
eds_configs = load_eds_files()
mt = MessageTable(CANOpenParser(eds_configs))

# Start the can bus and the curses app
with MagicCANBus(['vcan0', 'vcan1']) as bus, \
with MagicCANBus(args.interfaces, no_block=args.no_block) as bus, \
App(mt) as app:
while True:
# Bus updates
Expand Down
71 changes: 64 additions & 7 deletions canopen_monitor/app.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
from __future__ import annotations
import curses
import datetime as dt
from enum import Enum
from . import APP_NAME, APP_VERSION, APP_LICENSE, APP_AUTHOR, APP_DESCRIPTION, APP_URL
from .can import MessageTable, MessageType
from .ui import MessagePane
from .ui import MessagePane, PopupWindow


def pad_hex(value: int) -> str:
return f'0x{hex(value).upper()[2:].rjust(3, "0")}'


class KeyMap(Enum):
F1 = ('F1', 'Toggle app info menu', curses.KEY_F1)
F2 = ('F2', 'Toggle this menu', curses.KEY_F2)
UP_ARR = ('Up Arrow', 'Scroll pane up 1 row', curses.KEY_UP)
DOWN_ARR = ('Down Arrow', 'Scroll pane down 1 row', curses.KEY_DOWN)
LEFT_ARR = ('Left Arrow', 'Scroll pane left 4 cols', curses.KEY_LEFT)
RIGHT_ARR = ('Right Arrow', 'Scroll pane right 4 cols', curses.KEY_RIGHT)
S_UP_ARR = ('Shift + Up Arrow', 'Scroll pane up 16 rows', 337)
S_DOWN_ARR = ('Shift + Down Arrow', 'Scroll pane down 16 rows', 336)
C_UP_ARR = ('Ctrl + Up Arrow', 'Move pane selection up', 567)
C_DOWN_ARR = ('Ctrl + Down Arrow', 'Move pane selection down', 526)
RESIZE = ('Resize Terminal',
'Reset the dimensions of the app',
curses.KEY_RESIZE)


class App:
"""The User Interface
"""
Expand All @@ -27,10 +45,30 @@ def __enter__(self: App):
curses.curs_set(False) # Disable the cursor
self.__init_color_pairs() # Enable colors and create pairs

# Don't initialize any sub-panes or grids until standard io screen has
# been initialized
# Don't initialize any grids, sub-panes, or windows until standard io
# screen has been initialized
height, width = self.screen.getmaxyx()
height -= 1
self.info_win = PopupWindow(self.screen,
header=f'{APP_NAME.title()}'
f' v{APP_VERSION}',
content=[f'author: {APP_AUTHOR}',
f'license: {APP_LICENSE}',
f'respository: {APP_URL}',
'',
'Description:',
f'{APP_DESCRIPTION}'],
footer='F1: exit window',
style=curses.color_pair(1))
self.hotkeys_win = PopupWindow(self.screen,
header='Hotkeys',
content=list(
map(lambda x:
f'{x.value[0]}: {x.value[1]}'
f' ({x.value[2]})',
list(KeyMap))),
footer='F2: exit window',
style=curses.color_pair(1))
self.hb_pane = MessagePane(cols={'Node ID': ('node_name', 0, hex),
'State': ('state', 0),
'Status': ('message', 0)},
Expand Down Expand Up @@ -101,6 +139,16 @@ def _handle_keyboard_input(self: App) -> None:
self.__select_pane(self.hb_pane, 0)
elif(input == 526): # Ctrl + Down
self.__select_pane(self.misc_pane, 1)
elif(input == curses.KEY_F1):
if(self.hotkeys_win.enabled):
self.hotkeys_win.toggle()
self.hotkeys_win.clear()
self.info_win.toggle()
elif(input == curses.KEY_F2):
if(self.info_win.enabled):
self.info_win.toggle()
self.info_win.clear()
self.hotkeys_win.toggle()

def __init_color_pairs(self: App) -> None:
curses.start_color()
Expand Down Expand Up @@ -140,10 +188,19 @@ def __draw__footer(self: App) -> None:
self.screen.addstr(height - 1, 1, footer)

def draw(self: App, ifaces: [tuple]):
self.__draw_header(ifaces)
self.hb_pane.draw()
self.misc_pane.draw()
self.__draw__footer()
window_active = self.info_win.enabled or self.hotkeys_win.enabled
self.__draw_header(ifaces) # Draw header info

# Draw panes
if(not window_active):
self.hb_pane.draw()
self.misc_pane.draw()

# Draw windows
self.info_win.draw()
self.hotkeys_win.draw()

self.__draw__footer() # Draw footer info

def refresh(self: App):
self.screen.refresh()
9 changes: 3 additions & 6 deletions canopen_monitor/can/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pyvit.hw.socketcan import SocketCanDev


_SOCK_TIMEOUT = 0.3
_SOCK_TIMEOUT = 0.1
_STALE_INTERFACE = dt.timedelta(minutes=1)


Expand Down Expand Up @@ -183,8 +183,5 @@ def age(self: Interface) -> dt.timedelta:
"""
return dt.datetime.now() - self.last_activity

def __repr__(self: Interface) -> str:
return f'({self.name}:' \
f' {"UP" if self.is_up else "DOWN"},' \
f' {dt.datetime.now() - self.last_activity},' \
f' Bound: {self.listening}'
def __str__(self: Interface) -> str:
return self.name
24 changes: 16 additions & 8 deletions canopen_monitor/can/magic_can_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ class MagicCANBus:
:type interfaces: [Interface]
"""

def __init__(self: MagicCANBus, if_names: [str]):
def __init__(self: MagicCANBus, if_names: [str], no_block: bool = False):
self.interfaces = list(map(lambda x: Interface(x), if_names))
self.no_block = no_block
self.keep_alive = t.Event()
self.keep_alive.set()
self.message_queue = queue.SimpleQueue()
Expand Down Expand Up @@ -97,9 +98,15 @@ def __exit__(self: MagicCANBus,
evalue: str,
traceback: any) -> None:
self.keep_alive.clear()
for tr in self.threads:
print(f'Waiting for thread {tr} to end...')
tr.join()
if(self.no_block):
print('WARNING: Skipping wait-time for threads to close'
' gracefully.')
else:
print('Press <Ctrl + C> to quit without waiting.')
for tr in self.threads:
print(f'Waiting for thread {tr} to end... ', end='')
tr.join()
print('Done!')

def __iter__(self: MagicCANBus) -> MagicCANBus:
return self
Expand All @@ -109,10 +116,11 @@ def __next__(self: MagicCANBus) -> Message:
raise StopIteration
return self.message_queue.get(block=True)

def __repr__(self: MagicCANBus) -> str:
alive_threads = sum(map(lambda x: 1 if x.is_alive() else 0,
self.threads))
return f"Magic Can Bus: {self.interfaces}," \
def __str__(self: MagicCANBus) -> str:
# Subtract 1 since the parent thread should not be counted
alive_threads = t.active_count() - 1
if_list = ', '.join(list(map(lambda x: str(x), self.interfaces)))
return f"Magic Can Bus: {if_list}," \
f" pending messages: {self.message_queue.qsize()}" \
f" threads: {alive_threads}," \
f" keep-alive: {self.keep_alive.is_set()}"
4 changes: 3 additions & 1 deletion canopen_monitor/ui/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
of Curses UI and general user interaction with the app,
"""
from .pane import Pane
from .windows import PopupWindow
from .message_pane import MessagePane

__all__ = [
"Pane",
"MessagePane"
"MessagePane",
"PopupWindow"
]
3 changes: 3 additions & 0 deletions canopen_monitor/ui/pane.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ def add_line(self: Pane,
x: int,
line: str,
bold: bool = False,
underline: bool = False,
highlight: bool = False,
color: any = None) -> None:
"""Adds a line of text to the Pane and if needed, it handles the
Expand Down Expand Up @@ -278,6 +279,8 @@ def add_line(self: Pane,
line_style |= curses.A_BOLD
if(highlight):
line_style |= curses.A_REVERSE
if(underline):
line_style |= curses.A_UNDERLINE

# Add the line
if(y < self.d_height):
Expand Down
126 changes: 89 additions & 37 deletions canopen_monitor/ui/windows.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,92 @@
from __future__ import annotations
import curses
from .pane import Pane


class PopupWindow:
def __init__(self, parent, message, banner='fatal', color_pair=3):
height, width = parent.getmaxyx()
style = curses.color_pair(color_pair) | curses.A_REVERSE
any_key_message = "Press any key to continue..."
message = message.split('\n')
long = len(any_key_message)

for m in message:
if(len(m) > long):
long = len(m)
if(long < len(banner)):
long = len(banner)
long += 1

window = curses.newwin(len(message) + 2,
long + 2,
int((height - len(message) + 2) / 2),
int((width - long + 2) / 2))
window.attron(style)
for i, m in enumerate(message):
window.addstr(1 + i, 1, m.ljust(long, ' '))
window.box()
window.addstr(0, 1, banner + ":", curses.A_UNDERLINE | style)
window.addstr(len(message) + 1,
long - len(any_key_message),
any_key_message)

window.attroff(style)

window.refresh()
parent.refresh()

window.getch()
curses.flushinp()
window.clear()
parent.clear()
class PopupWindow(Pane):
def __init__(self: PopupWindow,
parent: any,
header: str = 'Alert',
content: [str] = [],
footer: str = 'ESC: close',
style: any = None):
super().__init__(parent=(parent or curses.newpad(0, 0)),
height=1,
width=1,
y=10,
x=10)
# Pop-up window properties
self.header = header
self.content = content
self.footer = footer
self.enabled = False

# Parent window dimensions (Usually should be STDOUT directly)
p_height, p_width = self.parent.getmaxyx()

# Break lines as necessary
self.content = self.break_lines(int(2 * p_width / 3), self.content)

# UI dimensions
p_height, p_width = self.parent.getmaxyx()
self.v_height = (len(self.content)) + 2
width = len(self.header) + 2
if(len(self.content) > 0):
width = max(width, max(list(map(lambda x: len(x), self.content))))
self.v_width = width + 4
self.y = int(((p_height + self.v_height) / 2) - self.v_height)
self.x = int(((p_width + self.v_width) / 2) - self.v_width)

# UI properties
self.style = (style or curses.color_pair(0))
self._pad.attron(self.style)

def break_lines(self: PopupWindow,
max_width: int,
content: [str]) -> [str]:
# Determine if some lines of content need to be broken up
for i, line in enumerate(content):
length = len(line)
mid = int(length / 2)

if(length >= max_width):
# Break the line at the next available space
for j, c in enumerate(line[mid - 1:]):
if(c == ' '):
mid += j
break

# Apply the line break to the content array
content.pop(i)
content.insert(i, line[:mid - 1])
content.insert(i + 1, line[mid:])
return content

def toggle(self: PopupWindow) -> bool:
self.enabled = not self.enabled
return self.enabled

def __draw_header(self: PopupWindow) -> None:
self.add_line(0, 1, self.header, underline=True)

def __draw__footer(self: PopupWindow) -> None:
f_width = len(self.footer) + 2
self.add_line(self.v_height - 1,
self.v_width - f_width,
self.footer,
underline=True)

def draw(self: PopupWindow) -> None:
if(self.enabled):
super().resize(self.v_height, self.v_width)
super().draw()
self.__draw_header()

for i, line in enumerate(self.content):
self.add_line(1 + i, 2, line)

self.__draw__footer()
super().refresh()
else:
# super().clear()
...
Loading

0 comments on commit dc1cacf

Please sign in to comment.