diff --git a/canopen_monitor/__init__.py b/canopen_monitor/__init__.py index 2a5e141..6a4401d 100755 --- a/canopen_monitor/__init__.py +++ b/canopen_monitor/__init__.py @@ -1,8 +1,8 @@ import os MAJOR = 3 -MINOR = 2 -PATCH = 3 +MINOR = 3 +PATCH = 0 APP_NAME = 'canopen-monitor' APP_DESCRIPTION = 'An NCurses-based TUI application for tracking activity' \ diff --git a/canopen_monitor/__main__.py b/canopen_monitor/__main__.py index 03ccd6b..10f2c5c 100755 --- a/canopen_monitor/__main__.py +++ b/canopen_monitor/__main__.py @@ -70,7 +70,7 @@ def main(): # Start the can bus and the curses app with MagicCANBus(args.interfaces, no_block=args.no_block) as bus, \ - App(mt, eds_configs) as app: + App(mt, eds_configs, bus) as app: while True: # Bus updates for message in bus: @@ -78,7 +78,7 @@ def main(): mt += message # User Input updates - app._handle_keyboard_input() + app.handle_keyboard_input() # Draw update app.draw(bus.statuses) diff --git a/canopen_monitor/app.py b/canopen_monitor/app.py index 1ecfa08..cd07631 100644 --- a/canopen_monitor/app.py +++ b/canopen_monitor/app.py @@ -1,13 +1,14 @@ from __future__ import annotations import curses +import curses.ascii import datetime as dt from easygui import fileopenbox from shutil import copy from enum import Enum from . import APP_NAME, APP_VERSION, APP_LICENSE, APP_AUTHOR, APP_DESCRIPTION, \ APP_URL, CACHE_DIR -from .can import MessageTable, MessageType -from .ui import MessagePane, PopupWindow +from .can import MessageTable, MessageType, MagicCANBus +from .ui import MessagePane, PopupWindow, InputPopup, SelectionPopup from .parse import eds # Key Constants not defined in curses @@ -53,6 +54,10 @@ class KeyMap(Enum): F2 = {'name': 'F2', 'description': 'Toggle this menu', 'key': curses.KEY_F2} F3 = {'name': 'F3', 'description': 'Toggle eds file select', 'key': curses.KEY_F3} + F4 = {'name': 'F4', 'description': 'Toggle add interface', + 'key': curses.KEY_F4} + F5 = {'name': 'F5', 'description': 'Toggle remove interface', + 'key': curses.KEY_F5} UP_ARR = {'name': 'Up Arrow', 'description': 'Scroll pane up 1 row', 'key': curses.KEY_UP} DOWN_ARR = {'name': 'Down Arrow', 'description': 'Scroll pane down 1 row', @@ -90,7 +95,8 @@ class App: :type selected_pane: MessagePane """ - def __init__(self: App, message_table: MessageTable, eds_configs: dict): + def __init__(self: App, message_table: MessageTable, eds_configs: dict, + bus: MagicCANBus): """ App Initialization function :param message_table: Reference to shared message table object @@ -98,6 +104,7 @@ def __init__(self: App, message_table: MessageTable, eds_configs: dict): """ self.table = message_table self.eds_configs = eds_configs + self.bus = bus self.selected_pane_pos = 0 self.selected_pane = None self.key_dict = { @@ -114,7 +121,9 @@ def __init__(self: App, message_table: MessageTable, eds_configs: dict): KeyMap.RESIZE.value['key']: self.resize, KeyMap.F1.value['key']: self.f1, KeyMap.F2.value['key']: self.f2, - KeyMap.F3.value['key']: self.f3 + KeyMap.F3.value['key']: self.f3, + KeyMap.F4.value['key']: self.f4, + KeyMap.F5.value['key']: self.f5, } def __enter__(self: App) -> App: @@ -130,6 +139,7 @@ def __enter__(self: App) -> App: self.screen.scrollok(True) # Enable window scroll self.screen.keypad(True) # Enable special key input self.screen.nodelay(True) # Disable user-input blocking + curses.noecho() # disable user-input echo curses.curs_set(False) # Disable the cursor self.__init_color_pairs() # Enable colors and create pairs @@ -157,6 +167,14 @@ def __enter__(self: App) -> App: list(KeyMap))), footer='F2: exit window', style=curses.color_pair(1)) + self.add_if_win = InputPopup(self.screen, + header='Add Interface', + footer='ENTER: save, F4: exit window', + style=curses.color_pair(1)) + self.remove_if_win = SelectionPopup(self.screen, + header='Remove Interface', + footer='ENTER: remove, F5: exit window', + style=curses.color_pair(1)) self.hb_pane = MessagePane(cols={'Node ID': ('node_name', 0, hex), 'State': ('state', 0), 'Status': ('message', 0)}, @@ -186,6 +204,8 @@ def __enter__(self: App) -> App: name='Miscellaneous', message_table=self.table) self.__select_pane(self.hb_pane, 0) + self.popups = [self.hotkeys_win, self.info_win, self.add_if_win, + self.remove_if_win] return self def __exit__(self: App, type, value, traceback) -> None: @@ -200,7 +220,7 @@ def __exit__(self: App, type, value, traceback) -> None: """ # Monitor destruction, restore terminal state curses.nocbreak() # Re-enable line-buffering - curses.noecho() # Enable user-input echo + curses.echo() # Enable user-input echo curses.curs_set(True) # Enable the cursor curses.resetty() # Restore the terminal state curses.endwin() # Destroy the virtual screen @@ -275,20 +295,14 @@ def f1(self): Toggle app info menu :return: None """ - if self.hotkeys_win.enabled: - self.hotkeys_win.toggle() - self.hotkeys_win.clear() - self.info_win.toggle() + self.toggle_popup(self.info_win) def f2(self): """ Toggles KeyMap :return: None """ - if self.info_win.enabled: - self.info_win.toggle() - self.info_win.clear() - self.hotkeys_win.toggle() + self.toggle_popup(self.hotkeys_win) def f3(self): """ @@ -306,14 +320,56 @@ def f3(self): copy(filepath, CACHE_DIR) self.eds_configs[file.node_id] = file + def f4(self) -> None: + """ + Toggles Add Interface Popup + :return: None + """ + self.toggle_popup(self.add_if_win) + + def f5(self) -> None: + """ + Toggles Remove Interface Popup + :return: None + """ + self.remove_if_win.content = self.bus.interface_list + self.toggle_popup(self.remove_if_win) + + def toggle_popup(self, selected_popup) -> None: + for popup in self.popups: + if popup != selected_popup and popup.enabled: + popup.toggle() + popup.clear() - def _handle_keyboard_input(self: App) -> None: + selected_popup.toggle() + + def handle_keyboard_input(self: App) -> None: """ Retrieves keyboard input and calls the associated key function """ keyboard_input = self.screen.getch() curses.flushinp() + if self.add_if_win.enabled: + if keyboard_input == curses.KEY_ENTER or \ + keyboard_input == 10 or keyboard_input == 13: + value = self.add_if_win.get_value() + if value != "": + self.bus.add_interface(value) + self.add_if_win.toggle() + else: + self.add_if_win.read_input(keyboard_input) + + elif self.remove_if_win.enabled: + if keyboard_input == curses.KEY_ENTER or \ + keyboard_input == 10 or keyboard_input == 13: + value = self.remove_if_win.get_value() + if value != "": + self.bus.remove_interface(value) + self.remove_if_win.toggle() + else: + self.remove_if_win.read_input(keyboard_input) + try: self.key_dict[keyboard_input]() except KeyError: @@ -372,7 +428,8 @@ def __draw__footer(self: App) -> None: :return: None """ height, width = self.screen.getmaxyx() - footer = ': Info, : Hotkeys, : Add OD File' + footer = ': Info, : Hotkeys, : Add OD File, ' \ + ': Add Interface, Remove Interface' self.screen.addstr(height - 1, 1, footer) def draw(self: App, ifaces: [tuple]) -> None: @@ -381,7 +438,7 @@ def draw(self: App, ifaces: [tuple]) -> None: :param ifaces: CAN Bus Interfaces :return: None """ - window_active = self.info_win.enabled or self.hotkeys_win.enabled + window_active = any(popup.enabled for popup in self.popups) self.__draw_header(ifaces) # Draw header info # Draw panes @@ -390,8 +447,8 @@ def draw(self: App, ifaces: [tuple]) -> None: self.misc_pane.draw() # Draw windows - self.info_win.draw() - self.hotkeys_win.draw() + for popup in self.popups: + popup.draw() self.__draw__footer() diff --git a/canopen_monitor/can/interface.py b/canopen_monitor/can/interface.py index ca5b9ae..127fa6d 100644 --- a/canopen_monitor/can/interface.py +++ b/canopen_monitor/can/interface.py @@ -101,7 +101,7 @@ def restart(self: Interface) -> None: >>> iface.start() """ self.stop() - self.start() + self.start(False) def recv(self: Interface) -> Message: """A wrapper for `pyvit.hw.SocketCanDev.recv()` diff --git a/canopen_monitor/can/magic_can_bus.py b/canopen_monitor/can/magic_can_bus.py index c97a941..6e952a8 100644 --- a/canopen_monitor/can/magic_can_bus.py +++ b/canopen_monitor/can/magic_can_bus.py @@ -16,10 +16,9 @@ class MagicCANBus: def __init__(self: MagicCANBus, if_names: [str], no_block: bool = False): self.interfaces = list(map(lambda x: Interface(x), if_names)) self.no_block = no_block - self.keep_alive = t.Event() - self.keep_alive.set() + self.keep_alive_list = dict() self.message_queue = queue.SimpleQueue() - self.threads = None + self.threads = [] @property def statuses(self: MagicCANBus) -> [tuple]: @@ -32,8 +31,55 @@ def statuses(self: MagicCANBus) -> [tuple]: """ return list(map(lambda x: (x.name, x.is_up), self.interfaces)) + @property + def interface_list(self: MagicCANBus) -> [str]: + """A list of strings representing all interfaces + :return: a list of strings indicating the name of each interface + :rtype: [str] + """ + return list(map(lambda x: str(x), self.interfaces)) + + def add_interface(self: MagicCANBus, interface: str) -> None: + """This will add an interface at runtime + + :param interface: The name of the interface to add + :type interface: string""" + + # Check if interface is already existing + interface_names = self.interface_list + if interface in interface_names: + return + + new_interface = Interface(interface) + self.interfaces.append(new_interface) + self.threads.append(self.start_handler(new_interface)) + + def remove_interface(self: MagicCANBus, interface: str) -> None: + """This will remove an interface at runtime + + :param interface: The name of the interface to remove + :type interface: string""" + + # Check if interface is already existing + interface_names = self.interface_list + if interface not in interface_names: + return + + self.keep_alive_list[interface].clear() + for thread in self.threads: + if thread.name == f'canopen-monitor-{interface}': + thread.join() + self.threads.remove(thread) + del self.keep_alive_list[interface] + + for existing_interface in self.interfaces: + if str(existing_interface) == interface: + self.interfaces.remove(existing_interface) + def start_handler(self: MagicCANBus, iface: Interface) -> t.Thread: """This is a wrapper for starting a single interface listener thread + This wrapper also creates a keep alive event for each thread which + can be used to kill the thread. .. warning:: @@ -50,8 +96,11 @@ def start_handler(self: MagicCANBus, iface: Interface) -> t.Thread: :return: The new listener thread spawned :rtype: threading.Thread """ + self.keep_alive_list[iface.name] = t.Event() + self.keep_alive_list[iface.name].set() + tr = t.Thread(target=self.handler, - name=f'canopem-monitor-{iface.name}', + name=f'canopen-monitor-{iface.name}', args=[iface], daemon=True) tr.start() @@ -67,25 +116,25 @@ def handler(self: MagicCANBus, iface: Interface) -> None: :param iface: The interface to bind to when listening for messages :type iface: Interface """ - iface.start() # The outer loop exists to enable interface recovery, if the interface # is either deleted or goes down, the handler will try to start it # again and read messages as soon as possible - while(self.keep_alive.is_set()): + while (self.keep_alive_list[iface.name].is_set()): try: # The inner loop is the constant reading of the bus and loading # of frames into a thread-safe queue. It is necessary to # check `iface.is_up` in the inner loop as well, so that the # handler will not block on bus reading if the MCB is trying # to close all threads and destruct itself - while(self.keep_alive.is_set() and iface.is_up): + while (iface.is_up and iface.running and + self.keep_alive_list[iface.name].is_set()): frame = iface.recv() - if(frame is not None): + if (frame is not None): self.message_queue.put(frame, block=True) iface.restart() except OSError: - iface.restart() + pass iface.stop() def __enter__(self: MagicCANBus) -> MagicCANBus: @@ -97,8 +146,9 @@ def __exit__(self: MagicCANBus, etype: str, evalue: str, traceback: any) -> None: - self.keep_alive.clear() - if(self.no_block): + for keep_alive in self.keep_alive_list.values(): + keep_alive.clear() + if (self.no_block): print('WARNING: Skipping wait-time for threads to close' ' gracefully.') else: @@ -112,7 +162,7 @@ def __iter__(self: MagicCANBus) -> MagicCANBus: return self def __next__(self: MagicCANBus) -> Message: - if(self.message_queue.empty()): + if (self.message_queue.empty()): raise StopIteration return self.message_queue.get(block=True) @@ -122,5 +172,4 @@ def __str__(self: MagicCANBus) -> str: if_list = ', '.join(list(map(lambda x: str(x), self.interfaces))) return f"Magic Can Bus: {if_list}," \ f" pending messages: {self.message_queue.qsize()}" \ - f" threads: {alive_threads}," \ - f" keep-alive: {self.keep_alive.is_set()}" + f" threads: {alive_threads}" diff --git a/canopen_monitor/ui/__init__.py b/canopen_monitor/ui/__init__.py index 266c6f5..e82beb4 100755 --- a/canopen_monitor/ui/__init__.py +++ b/canopen_monitor/ui/__init__.py @@ -2,11 +2,13 @@ of Curses UI and general user interaction with the app, """ from .pane import Pane -from .windows import PopupWindow +from .windows import PopupWindow, InputPopup, SelectionPopup from .message_pane import MessagePane __all__ = [ "Pane", "MessagePane", - "PopupWindow" + "PopupWindow", + "InputPopup", + "SelectionPopup", ] diff --git a/canopen_monitor/ui/windows.py b/canopen_monitor/ui/windows.py index 825871f..00fa039 100755 --- a/canopen_monitor/ui/windows.py +++ b/canopen_monitor/ui/windows.py @@ -1,5 +1,7 @@ from __future__ import annotations import curses +import curses.ascii + from .pane import Pane @@ -59,10 +61,12 @@ def break_lines(self: PopupWindow, length = len(line) mid = int(length / 2) - self.determine_to_break_content(content, i, length, line, max_width, mid) + self.determine_to_break_content(content, i, length, line, max_width, + mid) return content - def determine_to_break_content(self, content, i, length, line, max_width, mid): + def determine_to_break_content(self, content, i, length, line, max_width, + mid): if (length >= max_width): # Break the line at the next available space for j, c in enumerate(line[mid - 1:]): @@ -99,7 +103,7 @@ def __draw_content(self): self.add_line(1 + i, 2, line) def draw(self: PopupWindow) -> None: - if(self.enabled): + if (self.enabled): super().resize(self.v_height, self.v_width) super().draw() self.__draw_header() @@ -109,3 +113,158 @@ def draw(self: PopupWindow) -> None: else: # super().clear() ... + + +class InputPopup(PopupWindow): + """ + Input form creates a popup window for retrieving + text input from the user + + :param parent: parent ui element + :type: any + :param header: header text of popup window + :type: str + :param footer: footer text of popup window + :type: str + :param style: style of window + :type: any + :param input_len: Maximum length of input text + :type: int + """ + + def __init__(self: InputPopup, + parent: any, + header: str = 'Alert', + footer: str = 'ESC: close', + style: any = None, + input_len: int = 30, + ): + + self.input_len = input_len + content = [" " * self.input_len] + super().__init__(parent, header, content, footer, style) + self.cursor_loc = 0 + + def read_input(self, keyboard_input: int) -> None: + """ + Read process keyboard input (ascii or backspace) + + :param keyboard_input: curses input character value from curses.getch + :type: int + """ + if curses.ascii.isalnum(keyboard_input) and \ + self.cursor_loc < self.input_len: + temp = list(self.content[0]) + temp[self.cursor_loc] = chr(keyboard_input) + self.content[0] = "".join(temp) + self.cursor_loc += 1 + elif keyboard_input == curses.KEY_BACKSPACE and self.cursor_loc > 0: + self.cursor_loc -= 1 + temp = list(self.content[0]) + temp[self.cursor_loc] = " " + self.content[0] = "".join(temp) + + def toggle(self: InputPopup) -> bool: + """ + Toggle window and clear inserted text + :return: value indicating whether the window is enabled + :type: bool + """ + self.content = [" " * self.input_len] + self.cursor_loc = 0 + return super().toggle() + + def get_value(self) -> str: + """ + Get the value of user input without trailing spaces + :return: user input value + :type: str + """ + return self.content[0].strip() + + +class SelectionPopup(PopupWindow): + """ + Input form creates a popup window for selecting + from a list of options + + :param parent: parent ui element + :type: any + :param header: header text of popup window + :type: str + :param footer: footer text of popup window + :type: str + :param style: style of window + :type: any + """ + def __init__(self: SelectionPopup, + parent: any, + header: str = 'Alert', + footer: str = 'ESC: close', + style: any = None, + ): + content = [" " * 40] + super().__init__(parent, header, content, footer, style) + self.cursor_loc = 0 + + def read_input(self: SelectionPopup, keyboard_input: int) -> None: + """ + Read process keyboard input (ascii or backspace) + + :param keyboard_input: curses input character value from curses.getch + :type: int + """ + if keyboard_input == curses.KEY_UP and self.cursor_loc > 0: + self.cursor_loc -= 1 + elif keyboard_input == curses.KEY_DOWN and self.cursor_loc < len( + self.content) - 1: + self.cursor_loc += 1 + + def __draw_header(self: SelectionPopup) -> None: + """Add the header line to the window""" + self.add_line(0, 1, self.header, underline=True) + + def __draw__footer(self: SelectionPopup) -> None: + """Add the footer to the window""" + f_width = len(self.footer) + 2 + self.add_line(self.v_height - 1, + self.v_width - f_width, + self.footer, + underline=True) + + def __draw_content(self: SelectionPopup) -> None: + """Read each line of the content and add to the window""" + for i, line in enumerate(self.content): + self.add_line(1 + i, 2, line, highlight=i == self.cursor_loc) + + def draw(self: SelectionPopup) -> None: + if (self.enabled): + super().resize(len(self.content)+2, self.v_width) + if self.needs_refresh: + self.refresh() + + self.parent.attron(self._style) + self._pad.attron(self._style) + + if (self.border): + self._pad.box() + self.__draw_header() + self.__draw_content() + self.__draw__footer() + super().refresh() + + def toggle(self: InputPopup) -> bool: + """ + Toggle window and reset selected item + :return: value indicating whether the window is enabled + :type: bool + """ + self.cursor_loc = 0 + self.clear() + return super().toggle() + + def get_value(self): + if self.cursor_loc >= len(self.content): + return "" + + return self.content[self.cursor_loc] diff --git a/scripts/socketcan-dev.py b/scripts/socketcan-dev.py old mode 100644 new mode 100755 diff --git a/tests/spec_magic_can_bus.py b/tests/spec_magic_can_bus.py index a828487..56c3196 100644 --- a/tests/spec_magic_can_bus.py +++ b/tests/spec_magic_can_bus.py @@ -57,6 +57,6 @@ def test_str(self): of the bus """ expected = 'Magic Can Bus: vcan0, vcan1, pending messages:' \ - ' 0 threads: 0, keep-alive: True' + ' 0 threads: 0' actual = str(self.bus) self.assertEqual(expected, actual)