diff --git a/README.md b/README.md index 8b8567d..b759c4c 100755 --- a/README.md +++ b/README.md @@ -31,9 +31,13 @@ An NCurses-based TUI application for tracking activity over the CAN bus and deco *** # Configuration +The default configurations provided by CANOpen Monitor can be found in +[canopen_monitor/assets](./canopen_monitor/assets). These are the default +assets provided. At runtime these configs are copied to +`~/.config/canopen-monitor` where they can be modified and the changes +will persist. -The default configurations provided by CANOpen Monitor can be found in [canopen_monitor/assets](./canopen_monitor/assets). These are the default assets provided. At runtime these configs are copied to `~/.config/canopen-monitor` where they can be modified and the changes will persist. - +EDS files are loaded from `~/.cache/canopen-monitor` *** # Development and Contribution @@ -42,13 +46,39 @@ The default configurations provided by CANOpen Monitor can be found in [canopen_ Check out our [Read The Docs](https://canopen-monitor.readthedocs.io) pages for more info on the application sub-components and methods. +### Pre-Requisites +* Ubuntu/Debian Linux System + +* Python 3.8.5 or higher (pyenv is recommended for managing different python versions, https://realpython.com/intro-to-pyenv/#build-dependencies) + ### Install Locally +#### Setup a virtual CAN signal generator +`$` `sudo apt-get install can-utils` + +#### Start a virtual CAN +`$` `sudo ip link add dev vcan0 type vcan` + +`$` `sudo ip link set up vcan0` + +#### Clone the repo +`$` `git clone https://github.com/Boneill3/CANopen-monitor.git` + +`$` `cd CANopen-monitor` + `$` `pip install -e .[dev]` *(Note: the `-e` flag creates a symbolic-link to your local development version. Set it once, and forget it)* -### Create Documentation Locally +#### Generate random messages with socketcan-dev +`$` `chmod 700 socketcan-dev` + +`$` `./socketcan-dev.py --random-id --random-message -r` + +#### Start the monitor +`$` `canopen-monitor` + +### Create documentation locally `$` `make -C docs clean html` diff --git a/canopen_monitor/__init__.py b/canopen_monitor/__init__.py index a19f6fa..2a5e141 100755 --- a/canopen_monitor/__init__.py +++ b/canopen_monitor/__init__.py @@ -2,7 +2,7 @@ MAJOR = 3 MINOR = 2 -PATCH = 2 +PATCH = 3 APP_NAME = 'canopen-monitor' APP_DESCRIPTION = 'An NCurses-based TUI application for tracking activity' \ diff --git a/canopen_monitor/app.py b/canopen_monitor/app.py index e2d1a36..9112601 100644 --- a/canopen_monitor/app.py +++ b/canopen_monitor/app.py @@ -2,48 +2,108 @@ import curses import datetime as dt from enum import Enum -from . import APP_NAME, APP_VERSION, APP_LICENSE, APP_AUTHOR, APP_DESCRIPTION, APP_URL +from . import APP_NAME, APP_VERSION, APP_LICENSE, APP_AUTHOR, APP_DESCRIPTION, \ + APP_URL from .can import MessageTable, MessageType from .ui import MessagePane, PopupWindow +# Key Constants not defined in curses +# _UBUNTU key constants work in Ubuntu +KEY_S_UP = 337 +KEY_S_DOWN = 336 +KEY_C_UP = 567 +KEY_C_UP_UBUNTU = 566 +KEY_C_DOWN = 526 +KEY_C_DOWN_UBUNTU = 525 + +# Additional User Interface Related Constants +VERTICAL_SCROLL_RATE = 16 +HORIZONTAL_SCROLL_RATE = 4 + def pad_hex(value: int) -> str: + """ + Convert integer value to a hex string with padding + :param value: number of spaces to pad hex value + :return: padded string + """ return f'0x{hex(value).upper()[2:].rjust(3, "0")}' class KeyMap(Enum): - F1 = ('F1', 'Toggle app info menu', curses.KEY_F1) - F2 = ('F2', 'Toggle this menu', curses.KEY_F2) - UP_ARR = ('Up Arrow', 'Scroll pane up 1 row', curses.KEY_UP) - DOWN_ARR = ('Down Arrow', 'Scroll pane down 1 row', curses.KEY_DOWN) - LEFT_ARR = ('Left Arrow', 'Scroll pane left 4 cols', curses.KEY_LEFT) - RIGHT_ARR = ('Right Arrow', 'Scroll pane right 4 cols', curses.KEY_RIGHT) - S_UP_ARR = ('Shift + Up Arrow', 'Scroll pane up 16 rows', 337) - S_DOWN_ARR = ('Shift + Down Arrow', 'Scroll pane down 16 rows', 336) - C_UP_ARR = ('Ctrl + Up Arrow', 'Move pane selection up', 567) - C_DOWN_ARR = ('Ctrl + Down Arrow', 'Move pane selection down', 526) - RESIZE = ('Resize Terminal', - 'Reset the dimensions of the app', - curses.KEY_RESIZE) + """ + Enumerator of valid keyboard input + value[0]: input name + value[1]: input description + value[2]: curses input value key + """ + + F1 = {'name':'F1','description':'Toggle app info menu','key' : curses.KEY_F1} + F2 = {'name':'F2', 'description':'Toggle this menu', 'key': curses.KEY_F2} + UP_ARR = {'name':'Up Arrow', 'description':'Scroll pane up 1 row', 'key':curses.KEY_UP} + DOWN_ARR = {'name':'Down Arrow', 'description':'Scroll pane down 1 row', 'key':curses.KEY_DOWN} + LEFT_ARR = {'name':'Left Arrow', 'description':'Scroll pane left 4 cols', 'key':curses.KEY_LEFT} + RIGHT_ARR = {'name':'Right Arrow', 'description':'Scroll pane right 4 cols', 'key':curses.KEY_RIGHT} + S_UP_ARR = {'name':'Shift + Up Arrow', 'description':'Scroll pane up 16 rows', 'key':KEY_S_UP} + S_DOWN_ARR ={'name':'Shift + Down Arrow', 'description':'Scroll pane down 16 rows', 'key':KEY_S_DOWN} + C_UP_ARR ={'name':'Ctrl + Up Arrow', 'description':'Move pane selection up', 'key':[KEY_C_UP, KEY_C_UP_UBUNTU]} + C_DOWN_ARR ={'name':'Ctrl + Down Arrow', 'description':'Move pane selection down', 'key':[KEY_C_DOWN, KEY_C_DOWN_UBUNTU]} + RESIZE ={'name':'Resize Terminal', 'description':'Reset the dimensions of the app', 'key':curses.KEY_RESIZE} class App: - """The User Interface + """ + The User Interface Container + :param table + :type MessageTable + + :param selected_pane_pos index of currently selected pane + :type int + + :param selected_pane reference to currently selected Pane + :type MessagePane """ def __init__(self: App, message_table: MessageTable): + """ + App Initialization function + :param message_table: Reference to shared message table object + :type MessageTable + """ self.table = message_table self.selected_pane_pos = 0 self.selected_pane = None + self.key_dict = { + KeyMap.UP_ARR.value['key']: self.up, + KeyMap.S_UP_ARR.value['key']: self.shift_up, + KeyMap.C_UP_ARR.value['key'][0]: self.ctrl_up, + KeyMap.C_UP_ARR.value['key'][1]: self.ctrl_up, # Ubuntu key + KeyMap.DOWN_ARR.value['key']: self.down, + KeyMap.S_DOWN_ARR.value['key']: self.shift_down, + KeyMap.C_DOWN_ARR.value['key'][0]: self.ctrl_down, + KeyMap.C_DOWN_ARR.value['key'][1]: self.ctrl_down, # Ubuntu key + KeyMap.LEFT_ARR.value['key']: self.left, + KeyMap.RIGHT_ARR.value['key']: self.right, + KeyMap.RESIZE.value['key']: self.resize, + KeyMap.F1.value['key']: self.f1, + KeyMap.F2.value['key']: self.f2 + } - def __enter__(self: App): + def __enter__(self: App) -> App: + """ + Enter the runtime context related to this object + Create the user interface layout. Any changes to the layout should + be done here. + :return: self + :type App + """ # Monitor setup, take a snapshot of the terminal state self.screen = curses.initscr() # Initialize standard out - self.screen.scrollok(True) # Enable window scroll - self.screen.keypad(True) # Enable special key input - self.screen.nodelay(True) # Disable user-input blocking - curses.curs_set(False) # Disable the cursor - self.__init_color_pairs() # Enable colors and create pairs + self.screen.scrollok(True) # Enable window scroll + self.screen.keypad(True) # Enable special key input + self.screen.nodelay(True) # Disable user-input blocking + curses.curs_set(False) # Disable the cursor + self.__init_color_pairs() # Enable colors and create pairs # Don't initialize any grids, sub-panes, or windows until standard io # screen has been initialized @@ -63,10 +123,10 @@ def __enter__(self: App): self.hotkeys_win = PopupWindow(self.screen, header='Hotkeys', content=list( - map(lambda x: - f'{x.value[0]}: {x.value[1]}' - f' ({x.value[2]})', - list(KeyMap))), + map(lambda x: + f'{x.value["name"]}: {x.value["description"]}' + f' ({x.value["key"]})', + list(KeyMap))), footer='F2: exit window', style=curses.color_pair(1)) self.hb_pane = MessagePane(cols={'Node ID': ('node_name', 0, hex), @@ -101,56 +161,124 @@ def __enter__(self: App): return self def __exit__(self: App, type, value, traceback) -> None: + """ + Exit the runtime context related to this object. + Cleanup any curses settings to allow the terminal + to return to normal + :param type: exception type or None + :param value: exception value or None + :param traceback: exception traceback or None + :return: None + """ # Monitor destruction, restore terminal state - curses.nocbreak() # Re-enable line-buffering - curses.noecho() # Enable user-input echo - curses.curs_set(True) # Enable the cursor - curses.resetty() # Restore the terminal state - curses.endwin() # Destroy the virtual screen + curses.nocbreak() # Re-enable line-buffering + curses.noecho() # Enable user-input echo + curses.curs_set(True) # Enable the cursor + curses.resetty() # Restore the terminal state + curses.endwin() # Destroy the virtual screen - def _handle_keyboard_input(self: App) -> None: - """This is only a temporary implementation + def up(self): + """ + Up arrow key scrolls pane up 1 row + :return: None + """ + self.selected_pane.scroll_up() - .. deprecated:: + def shift_up(self): + """ + Shift + Up arrow key scrolls pane up 16 rows + :return: None + """ + self.selected_pane.scroll_up(rate=VERTICAL_SCROLL_RATE) - Soon to be removed + def ctrl_up(self): """ - # Grab user input - input = self.screen.getch() - curses.flushinp() + Ctrl + Up arrow key moves pane selection up + :return: None + """ + self.__select_pane(self.hb_pane, 0) - if(input == curses.KEY_UP): - self.selected_pane.scroll_up() - elif(input == curses.KEY_DOWN): - self.selected_pane.scroll_down() - elif(input == 337): # Shift + Up - self.selected_pane.scroll_up(rate=16) - elif(input == 336): # Shift + Down - self.selected_pane.scroll_down(rate=16) - elif(input == curses.KEY_LEFT): - self.selected_pane.scroll_left(rate=4) - elif(input == curses.KEY_RIGHT): - self.selected_pane.scroll_right(rate=4) - elif(input == curses.KEY_RESIZE): - self.hb_pane._reset_scroll_positions() - self.misc_pane._reset_scroll_positions() - self.screen.clear() - elif(input == 567): # Ctrl + Up - self.__select_pane(self.hb_pane, 0) - elif(input == 526): # Ctrl + Down - self.__select_pane(self.misc_pane, 1) - elif(input == curses.KEY_F1): - if(self.hotkeys_win.enabled): - self.hotkeys_win.toggle() - self.hotkeys_win.clear() - self.info_win.toggle() - elif(input == curses.KEY_F2): - if(self.info_win.enabled): - self.info_win.toggle() - self.info_win.clear() + def down(self): + """ + Down arrow key scrolls pane down 1 row + :return: None + """ + self.selected_pane.scroll_down() + + def shift_down(self): + """ + Shift + Down arrow key scrolls down pane 16 rows + :return: + """ + self.selected_pane.scroll_down(rate=VERTICAL_SCROLL_RATE) + + def ctrl_down(self): + """ + Ctrl + Down arrow key moves pane selection down + :return: None + """ + self.__select_pane(self.misc_pane, 1) + + def left(self): + """ + Left arrow key scrolls pane left 4 cols + :return: None + """ + self.selected_pane.scroll_left(rate=HORIZONTAL_SCROLL_RATE) + + def right(self): + """ + Right arrow key scrolls pane right 4 cols + :return: None + """ + self.selected_pane.scroll_right(rate=HORIZONTAL_SCROLL_RATE) + + def resize(self): + """ + Resets the dimensions of the app + :return: None + """ + self.hb_pane._reset_scroll_positions() + self.misc_pane._reset_scroll_positions() + self.screen.clear() + + def f1(self): + """ + Toggle app info menu + :return: None + """ + if self.hotkeys_win.enabled: self.hotkeys_win.toggle() + self.hotkeys_win.clear() + self.info_win.toggle() + + def f2(self): + """ + Toggles KeyMap + :return: None + """ + if self.info_win.enabled: + self.info_win.toggle() + self.info_win.clear() + self.hotkeys_win.toggle() + + def _handle_keyboard_input(self: App) -> None: + """ + Retrieves keyboard input and calls the associated key function + """ + keyboard_input = self.screen.getch() + curses.flushinp() + + try: + self.key_dict[keyboard_input]() + except KeyError: + ... def __init_color_pairs(self: App) -> None: + """ + Initialize color options used by curses + :return: None + """ curses.start_color() # Implied: color pair 0 is standard black and white curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK) @@ -160,8 +288,14 @@ def __init_color_pairs(self: App) -> None: curses.init_pair(5, curses.COLOR_MAGENTA, curses.COLOR_BLACK) def __select_pane(self: App, pane: MessagePane, pos: int) -> None: + """ + Set Pane as Selected + :param pane: Reference to selected Pane + :param pos: Index of Selected Pane + :return: None + """ # Only undo previous selection if there was any - if(self.selected_pane is not None): + if (self.selected_pane is not None): self.selected_pane.selected = False # Select the new pane and change internal Pane state to indicate it @@ -170,6 +304,11 @@ def __select_pane(self: App, pane: MessagePane, pos: int) -> None: self.selected_pane.selected = True def __draw_header(self: App, ifaces: [tuple]) -> None: + """ + Draw the header at the top of the interface + :param ifaces: CAN Bus Interfaces + :return: None + """ # Draw the timestamp date_str = f'{dt.datetime.now().ctime()},' self.screen.addstr(0, 0, date_str) @@ -183,16 +322,25 @@ def __draw_header(self: App, ifaces: [tuple]) -> None: pos += sl + 1 def __draw__footer(self: App) -> None: + """ + Draw the footer at the bottom of the interface + :return: None + """ height, width = self.screen.getmaxyx() footer = ': Info, : Hotkeys' self.screen.addstr(height - 1, 1, footer) - def draw(self: App, ifaces: [tuple]): + def draw(self: App, ifaces: [tuple]) -> None: + """ + Draw the entire interface + :param ifaces: CAN Bus Interfaces + :return: None + """ window_active = self.info_win.enabled or self.hotkeys_win.enabled self.__draw_header(ifaces) # Draw header info # Draw panes - if(not window_active): + if (not window_active): self.hb_pane.draw() self.misc_pane.draw() @@ -200,7 +348,11 @@ def draw(self: App, ifaces: [tuple]): self.info_win.draw() self.hotkeys_win.draw() - self.__draw__footer() # Draw footer info + self.__draw__footer() - def refresh(self: App): + def refresh(self: App) -> None: + """ + Refresh entire screen + :return: None + """ self.screen.refresh() diff --git a/canopen_monitor/can/message.py b/canopen_monitor/can/message.py index 82fcd6e..263cf51 100644 --- a/canopen_monitor/can/message.py +++ b/canopen_monitor/can/message.py @@ -34,8 +34,12 @@ class MessageType(Enum): # Special Types UKNOWN = (-0x1, -0x1) # Pseudo type unknown - PDO = (0x180, 0x57F) # Super type PDO - SDO = (0x580, 0x680) # Super type SDO + PDO = (0x180, 0x57F) # Super type PDO + SDO = (0x580, 0x680) # Super type SDO + + def __init__(self, start, end): + self.start = start + self.end = end @property def supertype(self: MessageType) -> MessageType: @@ -49,17 +53,15 @@ def supertype(self: MessageType) -> MessageType: :return: The supertype of this type :rtype: MessageType """ - if(self.value[0] >= self.PDO.value[0] - and self.value[0] <= self.PDO.value[1]): + if self.PDO.start <= self.start <= self.PDO.end: return MessageType['PDO'] - elif(self.value[0] >= self.SDO.value[0] - and self.value[0] <= self.SDO.value[1]): + elif self.SDO.start <= self.start <= self.SDO.end: return MessageType['SDO'] else: return MessageType['UKNOWN'] @staticmethod - def cob_to_node(mtype: MessageType, cob_id: int) -> int: + def cob_to_node(msg_type: MessageType, cob_id: int) -> int: """Determines the Node ID based on the given COB ID The COB ID is the raw ID sent with the CAN message, and the node id is @@ -85,7 +87,7 @@ def cob_to_node(mtype: MessageType, cob_id: int) -> int: :return: The Node ID :rtype: int """ - return cob_id - mtype.value[0] + return cob_id - msg_type.start @staticmethod def cob_id_to_type(cob_id: int) -> MessageType: @@ -97,9 +99,9 @@ def cob_id_to_type(cob_id: int) -> MessageType: :return: The message type (range) the COB ID fits into :rtype: MessageType """ - for t in list(MessageType): - if(cob_id >= t.value[0] and cob_id <= t.value[1]): - return t + for msg_type in list(MessageType): + if msg_type.start <= cob_id <= msg_type.end: + return msg_type return MessageType['UKNOWN'] def __str__(self) -> str: @@ -124,8 +126,6 @@ class MessageState(Enum): DEAD = 'Dead' def __str__(self: MessageState) -> str: - """ Overloaded `str()` operator - """ return self.value + ' ' @@ -160,9 +160,9 @@ def state(self: Message) -> MessageState: :return: State of the message :rtype: MessageState """ - if(self.age >= DEAD_TIME): + if self.age >= DEAD_TIME: return MessageState['DEAD'] - elif(self.age >= STALE_TIME): + elif self.age >= STALE_TIME: return MessageState['STALE'] else: return MessageState['ALIVE'] diff --git a/canopen_monitor/parse/__init__.py b/canopen_monitor/parse/__init__.py index e8027a3..7c66ddd 100644 --- a/canopen_monitor/parse/__init__.py +++ b/canopen_monitor/parse/__init__.py @@ -1,9 +1,8 @@ """This module is primarily responsible for providing a high-level interface -for parsing CANOpen messages according to Ojbect Definiton files or Electronic +for parsing CANOpen messages according to Object Definiton files or Electronic Data Sheet files, provided by the end user. """ import enum -from re import finditer from .eds import EDS, load_eds_file from .canopen import CANOpenParser @@ -11,92 +10,4 @@ 'CANOpenParser', 'EDS', 'load_eds_file', -] - -data_types = {0x01: "BOOLEAN", - 0x02: "INTEGER8", - 0x03: "INTEGER16", - 0x04: "INTEGER32", - 0x05: "UNSIGNED8", - 0x06: "UNSIGNED16", - 0x07: "UNSIGNED32", - 0x08: "REAL32", - 0x09: "VISIBLE_STRING", - 0x0A: "OCTET_STRING", - 0x0B: "UNICODE_STRING", - 0x0F: "DOMAIN", - 0x11: "REAL64", - 0x15: "INTEGER64", - 0x1B: "UNSIGNED64"} - -node_names = {0x01: "C3", - 0x06: "Solar Panel", - 0x11: "SDR GPS", - 0x12: "Star Tracker", - 0x21: "OreSat Live", - 0x22: "Cirrus Flux Cameras", - 0x31: "Battery", - 0x32: "Test Board 1", - 0x33: "Test Board 2", - 0x40: "MDC"} - - -class DataTypes(enum.Enum): - BOOLEAN = 0x1 - INTEGER8 = 0x2 - INTEGER16 = 0x3 - INTEGER32 = 0x4 - UNSIGNED8 = 0x5 - UNSIGNED16 = 0x6 - UNSIGNED32 = 0x7 - REAL32 = 0x8 - VISIBLE_STRING = 0x9 - OCTET_STRING = 0xA - UNICODE_STRING = 0xB - DOMAIN = 0xF - REAL64 = 0x11 - INTEGER64 = 0x15 - UNSIGNED64 = 0x1B - - -object_types = {0x00: "NULL", - 0x02: "DOMAIN", - 0x05: "DEFTYPE", - 0x06: "DEFSTRUCT", - 0x07: "VAR", - 0x08: "ARRAY", - 0x09: "RECORD"} - - -def camel_to_snake(old_name: str) -> str: - new_name = '' - - for match in finditer('[A-Z0-9]+[a-z]*', old_name): - span = match.span() - substr = old_name[span[0]:span[1]] - # length = span[1] - span[0] - found_submatch = False - - for sub_match in finditer('[A-Z]+', substr): - sub_span = sub_match.span() - sub_substr = old_name[sub_span[0]:sub_span[1]] - sub_length = sub_span[1] - sub_span[0] - - if (sub_length > 1): - found_submatch = True - - if (span[0] != 0): - new_name += '_' - - first = sub_substr[:-1] - second = substr.replace(first, '') - - new_name += '{}_{}'.format(first, second).lower() - - if (not found_submatch): - if (span[0] != 0): - new_name += '_' - - new_name += substr.lower() - - return new_name +] \ No newline at end of file diff --git a/canopen_monitor/parse/canopen.py b/canopen_monitor/parse/canopen.py index 8d6177c..841a180 100644 --- a/canopen_monitor/parse/canopen.py +++ b/canopen_monitor/parse/canopen.py @@ -7,36 +7,53 @@ from .sdo import SDOParser from .utilities import FailedValidationError - class CANOpenParser: + """ + A convenience wrapper for the parse function + """ def __init__(self, eds_configs: dict): self.sdo_parser = SDOParser() self.eds_configs = eds_configs def parse(self, message: Message) -> str: + """ + Detect the type of the given message and return the parsed version + + Arguments + --------- + @:param: message: a Message object containing the message + + Returns + ------- + `str`: The parsed message + + """ node_id = message.node_id eds_config = self.eds_configs.get(hex(node_id)) \ if node_id is not None else None + # Detect message type and select the appropriate parse function if (message.type == MessageType.SYNC): - parse = SYNCParser.parse + parse_function = SYNCParser.parse elif (message.type == MessageType.EMER): - parse = EMCYParser.parse + parse_function = EMCYParser.parse elif (message.supertype == MessageType.PDO): - parse = PDOParser.parse + parse_function = PDOParser.parse elif (message.supertype == MessageType.SDO): if self.sdo_parser.is_complete: self.sdo_parser = SDOParser() - parse = self.sdo_parser.parse + parse_function = self.sdo_parser.parse elif (message.type == MessageType.HEARTBEAT): - parse = HBParser.parse + parse_function = HBParser.parse elif (message.type == MessageType.TIME): - parse = TIMEParser.parse + parse_function = TIMEParser.parse else: - parse = None + parse_function = None + # Call the parse function and save the result + # On error, return the message data try: - parsed_message = parse(message.arb_id, message.data, eds_config) + parsed_message = parse_function(message.arb_id, message.data, eds_config) except (FailedValidationError, TypeError): parsed_message = ' '.join(list(map(lambda x: hex(x)[2:] .upper() diff --git a/canopen_monitor/parse/eds.py b/canopen_monitor/parse/eds.py index 92dad30..c0857d1 100644 --- a/canopen_monitor/parse/eds.py +++ b/canopen_monitor/parse/eds.py @@ -2,6 +2,47 @@ from typing import Union import canopen_monitor.parse as cmp from dateutil.parser import parse as dtparse +from re import sub, finditer + + +def camel_to_snake(old_str: str) -> str: + """ + Converts camel cased string to snake case, counting groups of repeated capital letters (such as "PDO") as one unit + That is, string like "PDO_group" become "pdo_group" instead of "p_d_o_group" + """ + # Find all groups that contains one or more capital letters followed by one or more lowercase letters + # The new, camel_cased string will be built up along the way + new_str = "" + for match in finditer('[A-Z0-9]+[a-z]*', old_str): + span = match.span() + substr = old_str[span[0]:span[1]] + found_submatch = False + + # Add a "_" to the newstring to separate the current match group from the previous + # It looks like we shouldn't need to worry about getting "_strings_like_this", because they don't seem to happen + if (span[0] != 0): + new_str += '_' + + # Find all sub-groups of *more than one* capital letters within the match group, and seperate them with "_" characters, + # Append the subgroups to the new_str as they are found + # If no subgroups are found, just append the match group to the new_str + for sub_match in finditer('[A-Z]+', substr): + sub_span = sub_match.span() + sub_substr = old_str[sub_span[0]:sub_span[1]] + sub_length = sub_span[1] - sub_span[0] + + if (sub_length > 1): + found_submatch = True + + first = sub_substr[:-1] + second = substr.replace(first, '') + + new_str += '{}_{}'.format(first, second).lower() + + if (not found_submatch): + new_str += substr.lower() + + return new_str class Metadata: @@ -16,7 +57,7 @@ def __init__(self, data): key, value = e.split('=') # Create the proper field name - key = cmp.camel_to_snake(key) + key = camel_to_snake(key) # Turn date-time-like objects into datetimes if ('time' in key): @@ -60,7 +101,7 @@ def __init__(self, data, sub_id=None): elif(all(c in string.hexdigits for c in value)): value = int(value, 16) - self.__setattr__(cmp.camel_to_snake(key), value) + self.__setattr__(camel_to_snake(key), value) def add(self, index) -> None: self.sub_indices.append(index) @@ -99,7 +140,7 @@ def __init__(self, eds_data: [str]): .add(Index(section[1:], sub_id=int(id[1], 16))) else: name = section[0][1:-1] - self.__setattr__(cmp.camel_to_snake(name), + self.__setattr__(camel_to_snake(name), Metadata(section[1:])) prev = i + 1 self.node_id = self[0x2101].default_value diff --git a/canopen_monitor/parse/hb.py b/canopen_monitor/parse/hb.py index 905c2ed..4993e14 100644 --- a/canopen_monitor/parse/hb.py +++ b/canopen_monitor/parse/hb.py @@ -2,6 +2,7 @@ from .utilities import FailedValidationError from ..can import MessageType +STATE_BYTE_IDX = 0 def parse(cob_id: int, data: list, eds_config: EDS): """ @@ -9,7 +10,8 @@ def parse(cob_id: int, data: list, eds_config: EDS): Arguments --------- - @:param: data: a byte string containing the heartbeat message + @:param: data: a byte string containing the heartbeat message, + byte 0 is the heartbeat state info. Returns ------- @@ -21,8 +23,9 @@ def parse(cob_id: int, data: list, eds_config: EDS): 0x05: "Operational", 0x7F: "Pre-operational" } + node_id = MessageType.cob_to_node(MessageType.HEARTBEAT, cob_id) - if len(data) < 1 or data[0] not in states: + if len(data) < 1 or data[STATE_BYTE_IDX] not in states: raise FailedValidationError(data, node_id, cob_id, __name__, "Invalid heartbeat state detected") - return states.get(data[0]) + return states.get(data[STATE_BYTE_IDX]) diff --git a/canopen_monitor/parse/pdo.py b/canopen_monitor/parse/pdo.py index 9b24a95..650868e 100644 --- a/canopen_monitor/parse/pdo.py +++ b/canopen_monitor/parse/pdo.py @@ -2,6 +2,7 @@ from math import ceil, floor from .eds import EDS from .utilities import FailedValidationError, get_name, decode +from ..can import MessageType PDO1_TX = 0x1A00 PDO1_RX = 0x1600 @@ -22,36 +23,36 @@ def parse(cob_id: int, data: bytes, eds: EDS): The eds mapping is determined by the cob_id passed ot this function. That indicated which PDO record to look up in the EDS file. """ - if 0x180 <= cob_id < 0x200: # PDO1 tx + if MessageType.PDO1_TX.value[0] <= cob_id < MessageType.PDO1_RX.value[0]: # PDO1 tx pdo_type = PDO1_TX - elif 0x200 <= cob_id < 0x280: # PDO1 rx + elif MessageType.PDO1_RX.value[0] <= cob_id < MessageType.PDO2_TX.value[0]: # PDO1 rx pdo_type = PDO1_RX - elif 0x280 <= cob_id < 0x300: # PDO2 tx + elif MessageType.PDO2_TX.value[0] <= cob_id < MessageType.PDO2_RX.value[0]: # PDO2 tx pdo_type = PDO2_TX - elif 0x300 <= cob_id < 0x380: # PDO2 rx + elif MessageType.PDO2_RX.value[0] <= cob_id < MessageType.PDO3_TX.value[0]: # PDO2 rx pdo_type = PDO2_RX - elif 0x380 <= cob_id < 0x400: # PDO3 tx + elif MessageType.PDO3_TX.value[0] <= cob_id < MessageType.PDO3_RX.value[0]: # PDO3 tx pdo_type = PDO3_TX - elif 0x400 <= cob_id < 0x480: # PDO3 rx + elif MessageType.PDO3_RX.value[0] <= cob_id < MessageType.PDO4_TX.value[0]: # PDO3 rx pdo_type = PDO3_RX - elif 0x480 <= cob_id < 0x500: # PDO4 tx + elif MessageType.PDO4_TX.value[0] <= cob_id < MessageType.PDO4_RX.value[0]: # PDO4 tx pdo_type = PDO4_TX - elif 0x500 <= cob_id < 0x580: # PDO4 rx + elif MessageType.PDO4_RX.value[0] <= cob_id < (MessageType.PDO4_RX.value[1] + 1): # PDO4 rx pdo_type = PDO4_RX else: - raise FailedValidationError(data, cob_id - 0x180, cob_id, __name__, + raise FailedValidationError(data, cob_id - MessageType.PDO1_TX.value[0], cob_id, __name__, f"Unable to determine pdo type with given " f"cob_id {hex(cob_id)}, expected value " - f"between 0x180 and 0x580") + f"between MessageType.PDO1_TX.value[0] and MessageType.PDO4_RX.value[1] + 1") if len(data) > 8 or len(data) < 1: - raise FailedValidationError(data, cob_id - 0x180, cob_id, __name__, + raise FailedValidationError(data, cob_id - MessageType.PDO1_TX.value[0], cob_id, __name__, f"Invalid payload length {len(data)} " f"expected between 1 and 8") try: eds_elements = eds[hex(pdo_type)][0] except TypeError: - raise FailedValidationError(data, cob_id - 0x180, cob_id, __name__, + raise FailedValidationError(data, cob_id - MessageType.PDO1_TX.value[0], cob_id, __name__, f"Unable to find eds data for pdo type " f"{hex(pdo_type)}") @@ -66,12 +67,12 @@ def parse(cob_id: int, data: bytes, eds: EDS): if num_elements in (0xFE, 0xFF): if len(data) != 8: - raise FailedValidationError(data, cob_id - 0x180, cob_id, __name__, + raise FailedValidationError(data, cob_id - MessageType.PDO1_TX.value[0], cob_id, __name__, f"Invalid payload length {len(data)} " f"expected 8") return parse_mpdo(num_elements, pdo_type, eds, data, cob_id) - raise FailedValidationError(data, cob_id - 0x180, cob_id, __name__, + raise FailedValidationError(data, cob_id - MessageType.PDO1_TX.value[0], cob_id, __name__, f"Invalid pdo mapping detected in eds file at " f"[{pdo_type}sub0]") @@ -87,7 +88,7 @@ def parse_pdo(num_elements, pdo_type, cob_id, eds, data): try: eds_record = eds[hex(pdo_type)][i] except TypeError: - raise FailedValidationError(data, cob_id - 0x180, cob_id, __name__, + raise FailedValidationError(data, cob_id - MessageType.PDO1_TX.value[0], cob_id, __name__, f"Unable to find eds data for pdo type " f"{hex(pdo_type)} index {i}") @@ -120,7 +121,7 @@ def parse_pdo(num_elements, pdo_type, cob_id, eds, data): def parse_mpdo(num_elements, pdo_type, eds, data, cob_id): mpdo = MPDO(data) if mpdo.is_source_addressing and num_elements != 0xFE: - raise FailedValidationError(data, cob_id - 0x180, cob_id, __name__, + raise FailedValidationError(data, cob_id - MessageType.PDO1_TX.value[0], cob_id, __name__, f"MPDO type and definition do not match. " f"Check eds file at [{pdo_type}sub0]") diff --git a/canopen_monitor/parse/sdo.py b/canopen_monitor/parse/sdo.py index 4acceec..dc66c04 100644 --- a/canopen_monitor/parse/sdo.py +++ b/canopen_monitor/parse/sdo.py @@ -2,6 +2,7 @@ from .eds import EDS from .utilities import FailedValidationError, get_name, decode from typing import List +from ..can import MessageType SDO_TX = 'SDO_TX' SDO_RX = 'SDO_RX' @@ -887,12 +888,12 @@ def is_complete(self): def parse(self, cob_id: int, data: List[int], eds: EDS): node_id = None try: - if 0x580 <= cob_id < 0x600: + if cob_id in range(*MessageType.SDO_TX.value): sdo_type = SDO_TX - node_id = cob_id - 0x580 - elif 0x600 <= cob_id < 0x680: + node_id = cob_id - MessageType.SDO_TX.value[0] + elif cob_id in range(*MessageType.SDO_RX.value): sdo_type = SDO_RX - node_id = cob_id - 0x600 + node_id = cob_id - MessageType.SDO_RX.value[0] else: raise ValueError(f"Provided COB-ID {str(cob_id)} " f"is outside of the range of SDO messages") diff --git a/canopen_monitor/parse/utilities.py b/canopen_monitor/parse/utilities.py index bdf5fe9..aa5d817 100644 --- a/canopen_monitor/parse/utilities.py +++ b/canopen_monitor/parse/utilities.py @@ -81,16 +81,10 @@ def get_name(eds_config: EDS, index: List[int]) -> (str, str): def decode(defined_type: str, data: List[int]) -> str: """ - Does something? - - Arguments - --------- - defined_type `str`: The data type? - data `[int]`: The data? - - Returns - ------- - `str`: something + Decodes data by defined type + :param defined_type: Hex constant for type + :param data: list of ints to be decoded + :return: Decoded data as string """ if defined_type in (UNSIGNED8, UNSIGNED16, UNSIGNED32, UNSIGNED64): result = str(int.from_bytes(data, byteorder="little", signed=False)) diff --git a/canopen_monitor/ui/message_pane.py b/canopen_monitor/ui/message_pane.py index 422082d..1ced972 100644 --- a/canopen_monitor/ui/message_pane.py +++ b/canopen_monitor/ui/message_pane.py @@ -5,7 +5,8 @@ class MessagePane(Pane): - """A derivative of Pane customized specifically to list miscellaneous CAN + """ + A derivative of Pane customized specifically to list miscellaneous CAN messages stored in a MessageTable :param name: The name of the pane (to be printed in the top left) @@ -58,7 +59,8 @@ def __init__(self: MessagePane, self.__reset_col_widths() def resize(self: MessagePane, height: int, width: int) -> None: - """A wrapper for `Pane.resize()`. This intercepts a call for a resize + """ + A wrapper for `Pane.resize()`. This intercepts a call for a resize in order to upate MessagePane-specific details that change on a resize event. The parent `resize()` gets called first and then MessagePane's details are updated. @@ -78,26 +80,34 @@ def resize(self: MessagePane, height: int, width: int) -> None: self.__top_max = occluded if occluded > 0 else 0 def _reset_scroll_positions(self: MessagePane) -> None: + """ + Reset the scroll positions. + Initialize the y position to be zero. + Initialize the x position to be zero. + """ self.cursor = self.cursor_max self.scroll_position_y = 0 self.scroll_position_x = 0 @property def scroll_limit_y(self: MessagePane) -> int: - """The maximim rows the pad is allowed to shift by when scrolling + """ + The maximim rows the pad is allowed to shift by when scrolling """ return self.d_height - 2 @property def scroll_limit_x(self: MessagePane) -> int: - """The maximim columns the pad is allowed to shift by when scrolling + """ + The maximim columns the pad is allowed to shift by when scrolling """ max_length = sum(list(map(lambda x: x[1], self.cols.values()))) occluded = max_length - self.d_width + 7 return occluded if(occluded > 0) else 0 def scroll_up(self: MessagePane, rate: int = 1) -> None: - """This overrides `Pane.scroll_up()`. Instead of shifting the + """ + This overrides `Pane.scroll_up()`. Instead of shifting the pad vertically, the slice of messages from the `MessageTable` is shifted. @@ -123,7 +133,8 @@ def scroll_up(self: MessagePane, rate: int = 1) -> None: self.__top = min if(self.__top < min) else self.__top def scroll_down(self: MessagePane, rate: int = 1) -> None: - """This overrides `Pane.scroll_up()`. Instead of shifting the + """ + This overrides `Pane.scroll_up()`. Instead of shifting the pad vertically, the slice of messages from the `MessageTable` is shifted. @@ -149,7 +160,8 @@ def scroll_down(self: MessagePane, rate: int = 1) -> None: self.__top = max if(self.__top > max) else self.__top def __draw_header(self: Pane) -> None: - """Draw the table header at the top of the Pane + """ + Draw the table header at the top of the Pane This uses the `cols` dictionary to determine what to write """ @@ -169,7 +181,8 @@ def __draw_header(self: Pane) -> None: pos += data[1] + self.__col_sep def draw(self: MessagePane) -> None: - """Draw all records from the MessageTable to the Pane + """ + Draw all records from the MessageTable to the Pane """ super().draw() self.resize(self.v_height, self.v_width) @@ -199,11 +212,21 @@ def draw(self: MessagePane) -> None: super().refresh() def __reset_col_widths(self: Message): + """ + Reset the width of Pane collumn. + Based on the length of data to change the width. + """ for name, data in self.cols.items(): self.cols[name] = (data[0], len(name), data[2]) \ if (len(data) == 3) else (data[0], len(name)) def __check_col_widths(self: MessagePane, messages: [Message]) -> None: + """ + Check the width of the message in Pane column. + + :param messages: The list of the messages + :type messages: list + """ for message in messages: for name, data in self.cols.items(): attr = getattr(message, data[0]) diff --git a/canopen_monitor/ui/pane.py b/canopen_monitor/ui/pane.py index 222d832..822da2b 100755 --- a/canopen_monitor/ui/pane.py +++ b/canopen_monitor/ui/pane.py @@ -4,7 +4,8 @@ class Pane(ABC): - """Abstract Pane Class, contains a PAD and a window + """ + Abstract Pane Class, contains a PAD and a window :param v_height: The virtual height of the embedded pad :type v_height: int @@ -30,7 +31,8 @@ def __init__(self: Pane, x: int = 0, border: bool = True, color_pair: int = 0): - """Abstract pane initialization + """ + Abstract pane initialization :param border: Toggiling whether or not to draw a border :type border: bool @@ -64,15 +66,22 @@ def __init__(self: Pane, @property def scroll_limit_y(self: Pane) -> int: + """ + Limit the scroll on the y axis + """ return 0 @property def scroll_limit_x(self: Pane) -> int: + """ + Limit the scroll on the x axis + """ return 0 @abstractmethod def draw(self: Pane) -> None: - """Abstract draw method, must be overwritten in child class + """ + Abstract draw method, must be overwritten in child class draw should first resize the pad using: `super().resize(w, h)` then add content using: self._pad.addstr() then refresh using: `super().refresh()` @@ -91,7 +100,8 @@ def draw(self: Pane) -> None: self._pad.box() def resize(self: Pane, height: int, width: int) -> None: - """Resize the virtual pad and change internal variables to reflect that + """ + Resize the virtual pad and change internal variables to reflect that :param height: New virtual height :type height: int @@ -105,12 +115,17 @@ def resize(self: Pane, height: int, width: int) -> None: self._pad.resize(self.v_height, self.v_width) def __reset_draw_dimensions(self: Pane) -> None: + """ + Reset the pane dimensions. + You can change the width and height of the pane. + """ p_height, p_width = self.parent.getmaxyx() self.d_height = min(self.v_height, p_height - 1) self.d_width = min(self.v_width, p_width - 1) def clear(self: Pane) -> None: - """Clear all contents of pad and parent window + """ + Clear all contents of pad and parent window .. warning:: @@ -123,7 +138,8 @@ def clear(self: Pane) -> None: # self.refresh() def clear_line(self: Pane, y: int, style: any = None) -> None: - """Clears a single line of the Pane + """ + Clears a single line of the Pane :param y: The line to clear :type y: int @@ -139,7 +155,8 @@ def clear_line(self: Pane, y: int, style: any = None) -> None: self._pad.attroff(line_style) def refresh(self: Pane) -> None: - """Refresh the pane based on configured draw dimensions + """ + Refresh the pane based on configured draw dimensions """ self._pad.refresh(self.scroll_position_y, self.scroll_position_x, @@ -150,7 +167,8 @@ def refresh(self: Pane) -> None: self.needs_refresh = False def scroll_up(self: Pane, rate: int = 1) -> bool: - """Scroll pad upwards + """ + Scroll pad upwards .. note:: @@ -171,7 +189,8 @@ def scroll_up(self: Pane, rate: int = 1) -> bool: return True def scroll_down(self: Pane, rate: int = 1) -> bool: - """Scroll pad downwards + """ + Scroll pad downwards .. note:: @@ -192,7 +211,8 @@ def scroll_down(self: Pane, rate: int = 1) -> bool: return True def scroll_left(self: Pane, rate: int = 1) -> bool: - """Scroll pad left + """ + Scroll pad left .. note:: @@ -213,7 +233,8 @@ def scroll_left(self: Pane, rate: int = 1) -> bool: return True def scroll_right(self: Pane, rate: int = 1) -> bool: - """Scroll pad right + """ + Scroll pad right .. note:: @@ -241,7 +262,8 @@ def add_line(self: Pane, underline: bool = False, highlight: bool = False, color: any = None) -> None: - """Adds a line of text to the Pane and if needed, it handles the + """ + Adds a line of text to the Pane and if needed, it handles the process of resizing the embedded pad :param y: Line's row position diff --git a/canopen_monitor/ui/windows.py b/canopen_monitor/ui/windows.py index 6d6dc54..7463f1c 100755 --- a/canopen_monitor/ui/windows.py +++ b/canopen_monitor/ui/windows.py @@ -4,6 +4,7 @@ class PopupWindow(Pane): + def __init__(self: PopupWindow, parent: any, header: str = 'Alert', @@ -15,11 +16,9 @@ def __init__(self: PopupWindow, width=1, y=10, x=10) + """Set an init to Popup window""" # Pop-up window properties - self.header = header - self.content = content - self.footer = footer - self.enabled = False + self.setWindowProperties(header, content, footer) # Parent window dimensions (Usually should be STDOUT directly) p_height, p_width = self.parent.getmaxyx() @@ -28,18 +27,32 @@ def __init__(self: PopupWindow, self.content = self.break_lines(int(2 * p_width / 3), self.content) # UI dimensions - p_height, p_width = self.parent.getmaxyx() + self.setUIDimension(p_height, p_width) + + # UI properties + self.style = (style or curses.color_pair(0)) + self._pad.attron(self.style) + + + def setUIDimension(self, p_height, p_width): + """Set UI Dimension (x,y) by giving parent + height and width""" self.v_height = (len(self.content)) + 2 width = len(self.header) + 2 - if(len(self.content) > 0): + if (len(self.content) > 0): width = max(width, max(list(map(lambda x: len(x), self.content)))) self.v_width = width + 4 self.y = int(((p_height + self.v_height) / 2) - self.v_height) self.x = int(((p_width + self.v_width) / 2) - self.v_width) - # UI properties - self.style = (style or curses.color_pair(0)) - self._pad.attron(self.style) + + def setWindowProperties(self:PopupWindow, header, content, footer): + """Set default window properties""" + self.header = header + self.content = content + self.footer = footer + self.enabled = False + def break_lines(self: PopupWindow, max_width: int, @@ -49,44 +62,60 @@ def break_lines(self: PopupWindow, length = len(line) mid = int(length / 2) - if(length >= max_width): - # Break the line at the next available space - for j, c in enumerate(line[mid - 1:]): - if(c == ' '): - mid += j - break - - # Apply the line break to the content array - content.pop(i) - content.insert(i, line[:mid - 1]) - content.insert(i + 1, line[mid:]) + self.determine_to_break_content(content, i, length, line, max_width, mid) return content + def determine_to_break_content(self, content, i, length, line, max_width, mid): + if (length >= max_width): + # Break the line at the next available space + for j, c in enumerate(line[mid - 1:]): + if (c == ' '): + mid += j + break + self.apply_line_to_content_array(content, i, line, mid) + + + def apply_line_to_content_array(self, content, i, line, mid): + """Apply the line break to the content array""" + content.pop(i) + content.insert(i, line[:mid - 1]) + content.insert(i + 1, line[mid:]) + def toggle(self: PopupWindow) -> bool: self.enabled = not self.enabled return self.enabled + def __draw_header(self: PopupWindow) -> None: + """Add the header line to the window""" self.add_line(0, 1, self.header, underline=True) + def __draw__footer(self: PopupWindow) -> None: + """Add the footer to the window""" f_width = len(self.footer) + 2 self.add_line(self.v_height - 1, self.v_width - f_width, self.footer, underline=True) + + def __draw_content(self): + """Read each line of the content and add to the window""" + for i, line in enumerate(self.content): + self.add_line(1 + i, 2, line) + + def draw(self: PopupWindow) -> None: if(self.enabled): super().resize(self.v_height, self.v_width) super().draw() self.__draw_header() - - for i, line in enumerate(self.content): - self.add_line(1 + i, 2, line) - + self.__draw_content() self.__draw__footer() super().refresh() else: # super().clear() ... + + diff --git a/tests/spec_eds_parser.py b/tests/spec_eds_parser.py index 6d9ccd0..e277300 100644 --- a/tests/spec_eds_parser.py +++ b/tests/spec_eds_parser.py @@ -1,8 +1,10 @@ import unittest -import canopen_monitor.parse.eds as eds +from canopen_monitor import parse from unittest.mock import mock_open, patch from tests import TEST_EDS +eds = parse.eds + class TestEDS(unittest.TestCase): def setUp(self):