diff --git a/.github/workflows/deployment.yaml b/.github/workflows/deployment.yaml index 4ffd344..7344a23 100755 --- a/.github/workflows/deployment.yaml +++ b/.github/workflows/deployment.yaml @@ -4,6 +4,8 @@ on: release: types: [ created ] + + jobs: deploy: @@ -14,10 +16,10 @@ jobs: - name: Set up Python uses: actions/setup-python@v2 with: - python-version: 3.8 + python-version: 3.9 - name: Update Pip - run: python -m pip install --upgrade pip + run: python3 -m pip install --upgrade pip - name: Install regular and dev dependencies run: pip install .[dev] diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index 2b174e1..273d0da 100755 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -13,13 +13,13 @@ jobs: steps: - uses: actions/checkout@v2 - - name: Set up Python 3.8 + - name: Set up Python uses: actions/setup-python@v2 with: - python-version: 3.8 + python-version: 3.9 - name: Update Pip - run: python -m pip install --upgrade pip + run: python3 -m pip install --upgrade pip - name: Install regular and dev dependencies run: pip install .[dev] @@ -30,4 +30,4 @@ jobs: flake8 canopen_monitor --count --exit-zero --max-complexity=30 --max-line-length=127 --statistics - name: Run unit tests - run: pytest tests/* + run: python3 -m unittest tests/spec_*.py diff --git a/MANIFEST.in b/MANIFEST.in deleted file mode 100644 index 6d73dc0..0000000 --- a/MANIFEST.in +++ /dev/null @@ -1,8 +0,0 @@ -include *.txt -include LICENSE -recursive-include canopen_monitor *.eds -recursive-include docs *.bat -recursive-include docs *.py -recursive-include docs *.rst -recursive-include docs Makefile -recursive-include scripts *.sh diff --git a/README.md b/README.md index dc47b8c..213175f 100755 --- a/README.md +++ b/README.md @@ -2,148 +2,50 @@ [![license](https://img.shields.io/github/license/oresat/CANopen-monitor)](./LICENSE) [![pypi](https://img.shields.io/pypi/v/canopen-monitor)](https://pypi.org/project/canopen-monitor) -[![read the docs](https://readthedocs.org/projects/canopen-monitor/badge/?version=latest)](https://canopen-monitor.readthedocs.io/en/latest/?badge=latest) +[![read the docs](https://readthedocs.org/projects/canopen-monitor/badge/?version=latest)](https://canopen-monitor.readthedocs.io) [![issues](https://img.shields.io/github/issues/oresat/CANopen-monitor/bug?label=issues)](https://github.com/oresat/CANopen-monitor/issues?q=is%3Aopen+is%3Aissue+label%3Abug) [![unit tests](https://img.shields.io/github/workflow/status/oresat/CANopen-monitor/Unit%20Tests?label=unit%20tests)](https://github.com/oresat/CANopen-monitor/actions?query=workflow%3A%22Unit+Tests%22) [![deployment](https://img.shields.io/github/workflow/status/oresat/CANopen-monitor/Deploy%20to%20PyPi?label=deployment)](https://github.com/oresat/CANopen-monitor/actions?query=workflow%3A%22Deploy+to+PyPi%22) -A utility for displaying and tracking activity over the CAN bus. +An NCurses-based TUI application for tracking activity over the CAN bus and decoding messages with provided EDS/OD files. *** -# Quick Start *(Usage)* +# Quick Start -## Install *(from PyPi)* +### Install -`$` `pip install package-demo` +`$` `pip install canopen-monitor` - -## Run +### Run `$` `canopen-monitor` *** -# Development and Contribution +# Configuration -## Build +The default configurations provided by CANOpen Monitor can be found in [canopen_monitor/assets](./canopen_monitor/assets). These are the default assets provided. At runtime these configs are copied to `~/.config/canopen-monitor` where they can be modified and the changes will persist. -`$` `python setup.py bdist_wheel sdist` +*** -## Install Locally +# Development and Contribution -`$` `pip install -e .[dev]` +### Documentation -*(The `-e` flag creates a symbolic-link to your local development version, so there's no need to uninstall and reinstall every time. Set it and forget it.)* +Check out our [Read The Docs](https://canopen-monitor.readthedocs.io) pages for more info on the application sub-components and methods. -## Create or Update Manifest +### Install Locally -`$` `rm -f MANIFEST.in && check-manifest --update` +`$` `pip install -e .[dev]` -## Create or Update Sphinx Documentation +*(Note: the `-e` flag creates a symbolic-link to your local development version. Set it once, and forget it)* -`$` `sphinx-apidoc -f -o docs canopen_monitor && make -C docs html` +### Create Documentation Locally -*** +`$` `make -C docs clean html` -# Default Configs - -These are the auto-generated configs that can be found at `~/.config/canopen-monitor/` - -`devices.json:` -```json -{ - "dead_timeout": 120, - "devices": [ - "can0" - ], - "stale_timeout": 60 -} -``` -A set of devices configs including a list of CAN Buses that CAN Monitor will try to bind to on launch as well as respective timeout lengths. - -*(note: additional buses can be added via cmd-line arguments, see `canopen-monitor --help`)* - -  - -`layout.json` -```json -{ - "data": [ - { - "data": [ - { - "capacity": null, - "fields": { - "COB ID": "arb_id", - "Node Name": "node_name", - "Interface": "interface", - "State": "status", - "Status": "parsed_msg" - }, - "frame_types": [ - "HEARTBEAT" - ], - "name": "Hearbeats", - "type": "message_table" - }, - { - "capacity": null, - "fields": [], - "frame_types": [], - "name": "Info", - "type": "message_table" - } - ], - "split": "vertical", - "type": "grid" - }, - { - "capacity": null, - "fields": { - "COB ID": "arb_id", - "Node Name": "node_name", - "Interface": "interface", - "Type": "message_type", - "Time Stamp": "timestamp", - "Message": "parsed_msg" - }, - "frame_types": [ - "NMT", - "SYNC", - "TIME", - "EMER", - "PDO1_TX", - "PDO1_RX", - "PDO2_TX", - "PDO2_RX", - "PDO3_TX", - "PDO3_RX", - "PDO4_TX", - "PDO4_RX", - "SDO_TX", - "SDO_RX", - "UKNOWN" - ], - "name": "Misc", - "type": "message_table" - } - ], - "split": "horizontal", - "type": "grid" -} -``` -A recursive set of dictionaries that define how CAN Monitor constructs the UI layout and what CAN Message types go to what tables. - -  - -`nodes.json` -```json -{ - "64": "MDC" -} -``` -A list of COB ID's in decimal notation that have a paired name which will override the default display name of that node in CAN Monitor. +*(Note: documentation is configured to auto-build with ReadTheDocs on every push to master)* *** diff --git a/canopen_monitor/__init__.py b/canopen_monitor/__init__.py index 84d9ce4..68e6491 100755 --- a/canopen_monitor/__init__.py +++ b/canopen_monitor/__init__.py @@ -5,23 +5,19 @@ PATCH = 0 APP_NAME = 'canopen-monitor' -APP_DESCRIPTION \ - = 'A utility for displaying and tracking activity over the CAN bus.' -APP_VERSION = "{}.{}.{}".format(MAJOR, MINOR, PATCH) -APP_AUTHOR = "Dmitri McGuckin" +APP_DESCRIPTION = 'An NCurses-based TUI application for tracking activity' \ + ' over the CAN bus and decoding messages with provided' \ + ' EDS/OD files.' +APP_VERSION = f'{MAJOR}.{MINOR}.{PATCH}' +APP_AUTHOR = 'Dmitri McGuckin' APP_EMAIL = 'dmitri3@pdx.edu' -APP_URL = "https://github.com/oresat/CANopen-monitor" +APP_URL = 'https://github.com/oresat/CANopen-monitor' APP_LICENSE = 'GPL-3.0' -CONFIG_FORMAT_VERSION = 1 -CONFIG_DIR = os.path.expanduser('~/.config/{}'.format(APP_NAME)) + os.sep -CACHE_DIR = os.path.expanduser('~/.cache/{}'.format(APP_NAME)) + os.sep -ASSETS_DIR \ - = os.path.abspath(__path__[0] + os.sep + 'assets') + os.sep -EDS_DIR = ASSETS_DIR -DEVICES_CONFIG = CONFIG_DIR + 'devices.json' -LAYOUT_CONFIG = CONFIG_DIR + 'layout.json' -NODES_CONFIG = CONFIG_DIR + 'nodes.json' +MAINTAINER_NAME = 'Portland State Aerospace Society' +MAINTAINER_EMAIL = 'oresat@pdx.edu' -DEBUG = False -TIMEOUT = 0.1 +CONFIG_DIR = os.path.expanduser(f'~/.config/{APP_NAME}') +CACHE_DIR = os.path.expanduser(f'~/.cache/{APP_NAME}') + +CONFIG_FORMAT_VERSION = 2 diff --git a/canopen_monitor/__main__.py b/canopen_monitor/__main__.py index 9be976d..b49e03b 100755 --- a/canopen_monitor/__main__.py +++ b/canopen_monitor/__main__.py @@ -1,113 +1,66 @@ import os import argparse -import canopen_monitor as cm -import canopen_monitor.utilities as utils -import canopen_monitor.parser.eds as eds -from canopen_monitor.monitor_app import MonitorApp +from . import APP_NAME, APP_DESCRIPTION, CONFIG_DIR, CACHE_DIR +from .app import App +from .can import MagicCANBus, MessageTable +from .parse import CANOpenParser, load_eds_file -def ensure_config_load(filepath: str) -> dict: - # Attempt to load config from file - try: - config = utils.load_config(filepath) - # If it doesn't exist, call the config factory to generate it, then load it - except FileNotFoundError: - utils.config_factory(filepath) - config = utils.load_config(filepath) - finally: - return config +def init_dirs(): + os.makedirs(CONFIG_DIR, exist_ok=True) + os.makedirs(CACHE_DIR, exist_ok=True) -def load_eds_configs(eds_path: str) -> dict: +def load_eds_files(filepath: str = CACHE_DIR) -> dict: configs = {} - for file in os.listdir(cm.EDS_DIR): - file = cm.EDS_DIR + file - eds_config = eds.load_eds_file(file) - node_id = eds_config[2101].default_value - - if(cm.DEBUG): - print('Loaded config for {}({}) witn {} registered subindicies!' - .format(eds_config.device_info.product_name, - node_id, - len(eds_config))) - configs[node_id] = eds_config + for file in os.listdir(filepath): + full_path = f'{filepath}/{file}' + config = load_eds_file(full_path) + configs[config.node_id] = config return configs -def overwrite_node_names(node_names: dict, eds_configs: dict): - for node_id, new_name in node_names.items(): - eds_config = eds_configs.get(node_id) - if(eds_config is None): - if(cm.DEBUG): - print('Tried to override Node ID: {} but no EDS config was \ - registered with that ID! Skipping!'.format(node_id)) - else: - if(cm.DEBUG): - print('Modifying {} to have product name: {}' - .format(eds_config, new_name)) - eds_config.device_info.product_name = new_name - - def main(): - # Setup program arguments and options - parser = argparse.ArgumentParser(prog=cm.APP_NAME, - description=cm.APP_DESCRIPTION, + parser = argparse.ArgumentParser(prog=APP_NAME, + description=APP_DESCRIPTION, allow_abbrev=False) - parser.add_argument('-v', '--verbose', - dest='debug', - action='store_true', - default=False, - help='enable additional debug info') - parser.add_argument('-i', '--interfaces', + parser.add_argument('-i', '--interface', dest='interfaces', type=str, - nargs=1, - default="", - help='specify additional busses to listen on') + nargs='+', + default=['vcan0'], + help='A list of interfaces to bind to.') + parser.add_argument('--no-block', + dest='no_block', + action='store_true', + default=False, + help='Disable block-waiting for the Magic CAN Bus.' + ' (Warning, this may produce undefined' + ' behavior).') args = parser.parse_args() - # Set important app-runtime flags - cm.DEBUG = args.debug - - # Guarentee the config directory exists - utils.generate_dirs() - - # Fetch the devices configurations - devices_cfg = ensure_config_load(cm.DEVICES_CONFIG) - dev_names = devices_cfg['devices'] - timeouts = (devices_cfg['stale_timeout'], devices_cfg['dead_timeout']) - - # If any interfaces are specified by command line, add them to the list - if(len(args.interfaces) > 0): - dev_names += args.interfaces[0].split(' ') - - # Fetch the table schemas - table_schema = ensure_config_load(cm.LAYOUT_CONFIG) - - # Fetch all of the EDS files that exist - eds_configs = load_eds_configs(cm.EDS_DIR) - - # Fetch all of the node-name overrides - node_names = ensure_config_load(cm.NODES_CONFIG) - - # Overwrite the node names - overwrite_node_names(node_names, eds_configs) - - # Create the app - canmonitor = MonitorApp(dev_names, timeouts, table_schema, eds_configs) - try: - # Start the application - canmonitor.start() + init_dirs() + eds_configs = load_eds_files() + mt = MessageTable(CANOpenParser(eds_configs)) + + # Start the can bus and the curses app + with MagicCANBus(args.interfaces, no_block=args.no_block) as bus, \ + App(mt) as app: + while True: + # Bus updates + for message in bus: + if message is not None: + mt += message + + # User Input updates + app._handle_keyboard_input() + + # Draw update + app.draw(bus.statuses) except KeyboardInterrupt: - # Stop the application on Ctrl+C input - print('Stopping {}...'.format(cm.APP_NAME)) - finally: - # Ensure that the application is properly stopped - # and that all of its threads are gracefully closed out - canmonitor.stop() print('Goodbye!') -if __name__ == "__main__": +if __name__ == '__main__': main() diff --git a/canopen_monitor/app.py b/canopen_monitor/app.py new file mode 100644 index 0000000..e2d1a36 --- /dev/null +++ b/canopen_monitor/app.py @@ -0,0 +1,206 @@ +from __future__ import annotations +import curses +import datetime as dt +from enum import Enum +from . import APP_NAME, APP_VERSION, APP_LICENSE, APP_AUTHOR, APP_DESCRIPTION, APP_URL +from .can import MessageTable, MessageType +from .ui import MessagePane, PopupWindow + + +def pad_hex(value: int) -> str: + return f'0x{hex(value).upper()[2:].rjust(3, "0")}' + + +class KeyMap(Enum): + F1 = ('F1', 'Toggle app info menu', curses.KEY_F1) + F2 = ('F2', 'Toggle this menu', curses.KEY_F2) + UP_ARR = ('Up Arrow', 'Scroll pane up 1 row', curses.KEY_UP) + DOWN_ARR = ('Down Arrow', 'Scroll pane down 1 row', curses.KEY_DOWN) + LEFT_ARR = ('Left Arrow', 'Scroll pane left 4 cols', curses.KEY_LEFT) + RIGHT_ARR = ('Right Arrow', 'Scroll pane right 4 cols', curses.KEY_RIGHT) + S_UP_ARR = ('Shift + Up Arrow', 'Scroll pane up 16 rows', 337) + S_DOWN_ARR = ('Shift + Down Arrow', 'Scroll pane down 16 rows', 336) + C_UP_ARR = ('Ctrl + Up Arrow', 'Move pane selection up', 567) + C_DOWN_ARR = ('Ctrl + Down Arrow', 'Move pane selection down', 526) + RESIZE = ('Resize Terminal', + 'Reset the dimensions of the app', + curses.KEY_RESIZE) + + +class App: + """The User Interface + """ + + def __init__(self: App, message_table: MessageTable): + self.table = message_table + self.selected_pane_pos = 0 + self.selected_pane = None + + def __enter__(self: App): + # Monitor setup, take a snapshot of the terminal state + self.screen = curses.initscr() # Initialize standard out + self.screen.scrollok(True) # Enable window scroll + self.screen.keypad(True) # Enable special key input + self.screen.nodelay(True) # Disable user-input blocking + curses.curs_set(False) # Disable the cursor + self.__init_color_pairs() # Enable colors and create pairs + + # Don't initialize any grids, sub-panes, or windows until standard io + # screen has been initialized + height, width = self.screen.getmaxyx() + height -= 1 + self.info_win = PopupWindow(self.screen, + header=f'{APP_NAME.title()}' + f' v{APP_VERSION}', + content=[f'author: {APP_AUTHOR}', + f'license: {APP_LICENSE}', + f'respository: {APP_URL}', + '', + 'Description:', + f'{APP_DESCRIPTION}'], + footer='F1: exit window', + style=curses.color_pair(1)) + self.hotkeys_win = PopupWindow(self.screen, + header='Hotkeys', + content=list( + map(lambda x: + f'{x.value[0]}: {x.value[1]}' + f' ({x.value[2]})', + list(KeyMap))), + footer='F2: exit window', + style=curses.color_pair(1)) + self.hb_pane = MessagePane(cols={'Node ID': ('node_name', 0, hex), + 'State': ('state', 0), + 'Status': ('message', 0)}, + types=[MessageType.HEARTBEAT], + parent=self.screen, + height=int(height / 2) - 1, + width=width, + y=1, + x=0, + name='Heartbeats', + message_table=self.table) + self.misc_pane = MessagePane(cols={'COB ID': ('arb_id', 0, pad_hex), + 'Node Name': ('node_name', 0, hex), + 'Type': ('type', 0), + 'Message': ('message', 0)}, + types=[MessageType.NMT, + MessageType.SYNC, + MessageType.TIME, + MessageType.EMER, + MessageType.SDO, + MessageType.PDO], + parent=self.screen, + height=int(height / 2), + width=width, + y=int(height / 2), + x=0, + name='Miscellaneous', + message_table=self.table) + self.__select_pane(self.hb_pane, 0) + return self + + def __exit__(self: App, type, value, traceback) -> None: + # Monitor destruction, restore terminal state + curses.nocbreak() # Re-enable line-buffering + curses.noecho() # Enable user-input echo + curses.curs_set(True) # Enable the cursor + curses.resetty() # Restore the terminal state + curses.endwin() # Destroy the virtual screen + + def _handle_keyboard_input(self: App) -> None: + """This is only a temporary implementation + + .. deprecated:: + + Soon to be removed + """ + # Grab user input + input = self.screen.getch() + curses.flushinp() + + if(input == curses.KEY_UP): + self.selected_pane.scroll_up() + elif(input == curses.KEY_DOWN): + self.selected_pane.scroll_down() + elif(input == 337): # Shift + Up + self.selected_pane.scroll_up(rate=16) + elif(input == 336): # Shift + Down + self.selected_pane.scroll_down(rate=16) + elif(input == curses.KEY_LEFT): + self.selected_pane.scroll_left(rate=4) + elif(input == curses.KEY_RIGHT): + self.selected_pane.scroll_right(rate=4) + elif(input == curses.KEY_RESIZE): + self.hb_pane._reset_scroll_positions() + self.misc_pane._reset_scroll_positions() + self.screen.clear() + elif(input == 567): # Ctrl + Up + self.__select_pane(self.hb_pane, 0) + elif(input == 526): # Ctrl + Down + self.__select_pane(self.misc_pane, 1) + elif(input == curses.KEY_F1): + if(self.hotkeys_win.enabled): + self.hotkeys_win.toggle() + self.hotkeys_win.clear() + self.info_win.toggle() + elif(input == curses.KEY_F2): + if(self.info_win.enabled): + self.info_win.toggle() + self.info_win.clear() + self.hotkeys_win.toggle() + + def __init_color_pairs(self: App) -> None: + curses.start_color() + # Implied: color pair 0 is standard black and white + curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK) + curses.init_pair(2, curses.COLOR_YELLOW, curses.COLOR_BLACK) + curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLACK) + curses.init_pair(4, curses.COLOR_CYAN, curses.COLOR_BLACK) + curses.init_pair(5, curses.COLOR_MAGENTA, curses.COLOR_BLACK) + + def __select_pane(self: App, pane: MessagePane, pos: int) -> None: + # Only undo previous selection if there was any + if(self.selected_pane is not None): + self.selected_pane.selected = False + + # Select the new pane and change internal Pane state to indicate it + self.selected_pane = pane + self.selected_pane_pos = pos + self.selected_pane.selected = True + + def __draw_header(self: App, ifaces: [tuple]) -> None: + # Draw the timestamp + date_str = f'{dt.datetime.now().ctime()},' + self.screen.addstr(0, 0, date_str) + pos = len(date_str) + 1 + + # Draw the interfaces + for iface in ifaces: + color = curses.color_pair(1) if iface[1] else curses.color_pair(3) + sl = len(iface[0]) + self.screen.addstr(0, pos, iface[0], color) + pos += sl + 1 + + def __draw__footer(self: App) -> None: + height, width = self.screen.getmaxyx() + footer = ': Info, : Hotkeys' + self.screen.addstr(height - 1, 1, footer) + + def draw(self: App, ifaces: [tuple]): + window_active = self.info_win.enabled or self.hotkeys_win.enabled + self.__draw_header(ifaces) # Draw header info + + # Draw panes + if(not window_active): + self.hb_pane.draw() + self.misc_pane.draw() + + # Draw windows + self.info_win.draw() + self.hotkeys_win.draw() + + self.__draw__footer() # Draw footer info + + def refresh(self: App): + self.screen.refresh() diff --git a/canopen_monitor/assets/devices.json b/canopen_monitor/assets/devices.json new file mode 100644 index 0000000..95a016d --- /dev/null +++ b/canopen_monitor/assets/devices.json @@ -0,0 +1,8 @@ +{ + "config_format_version": 2, + "dead_timeout": 120, + "devices": [ + "can0" + ], + "stale_timeout": 60 +} diff --git a/canopen_monitor/assets/CFC_OD.eds b/canopen_monitor/assets/eds/CFC_OD.eds old mode 100755 new mode 100644 similarity index 100% rename from canopen_monitor/assets/CFC_OD.eds rename to canopen_monitor/assets/eds/CFC_OD.eds diff --git a/canopen_monitor/assets/GPS_OD.eds b/canopen_monitor/assets/eds/GPS_OD.eds old mode 100755 new mode 100644 similarity index 100% rename from canopen_monitor/assets/GPS_OD.eds rename to canopen_monitor/assets/eds/GPS_OD.eds diff --git a/canopen_monitor/assets/live_OD.eds b/canopen_monitor/assets/eds/live_OD.eds old mode 100755 new mode 100644 similarity index 100% rename from canopen_monitor/assets/live_OD.eds rename to canopen_monitor/assets/eds/live_OD.eds diff --git a/canopen_monitor/assets/star_tracker_OD.eds b/canopen_monitor/assets/eds/star_tracker_OD.eds old mode 100755 new mode 100644 similarity index 100% rename from canopen_monitor/assets/star_tracker_OD.eds rename to canopen_monitor/assets/eds/star_tracker_OD.eds diff --git a/canopen_monitor/assets/layout.json b/canopen_monitor/assets/layout.json new file mode 100644 index 0000000..0f9869d --- /dev/null +++ b/canopen_monitor/assets/layout.json @@ -0,0 +1,65 @@ +{ + "config_format_version": 2, + "data": [ + { + "data": [ + { + "capacity": null, + "fields": { + "COB ID": "arb_id", + "Node Name": "node_name", + "Interface": "interface", + "State": "status", + "Status": "parsed_msg" + }, + "frame_types": [ + "HEARTBEAT" + ], + "name": "Hearbeats", + "type": "message_table" + }, + { + "capacity": null, + "fields": [], + "frame_types": [], + "name": "Info", + "type": "message_table" + } + ], + "split": "vertical", + "type": "grid" + }, + { + "capacity": null, + "fields": { + "COB ID": "arb_id", + "Node Name": "node_name", + "Interface": "interface", + "Type": "message_type", + "Time Stamp": "timestamp", + "Message": "parsed_msg" + }, + "frame_types": [ + "NMT", + "SYNC", + "TIME", + "EMER", + "PDO1_TX", + "PDO1_RX", + "PDO2_TX", + "PDO2_RX", + "PDO3_TX", + "PDO3_RX", + "PDO4_TX", + "PDO4_RX", + "SDO_TX", + "SDO_RX", + "UKNOWN" + ], + "name": "Misc", + "type": "message_table" + } + ], + "split": "horizontal", + "type": "grid" +} diff --git a/canopen_monitor/assets/nodes.json b/canopen_monitor/assets/nodes.json new file mode 100644 index 0000000..8a7ab44 --- /dev/null +++ b/canopen_monitor/assets/nodes.json @@ -0,0 +1,6 @@ +{ + "config_format_version": 2, + "nodes": { + "40": "MDC" + } +} diff --git a/canopen_monitor/can/__init__.py b/canopen_monitor/can/__init__.py new file mode 100644 index 0000000..11f1348 --- /dev/null +++ b/canopen_monitor/can/__init__.py @@ -0,0 +1,18 @@ +"""This module is primarily responsible for providing a reliable high-level +interface to the CAN Bus as well as describing the format and structure of raw +CAN messages according to the +`CANOpen spec `_. +""" +from .message import Message, MessageState, MessageType +from .message_table import MessageTable +from .interface import Interface +from .magic_can_bus import MagicCANBus + +__all__ = [ + 'Message', + "MessageState", + "MessageType", + "MessageTable", + 'Interface', + 'MagicCANBus', +] diff --git a/canopen_monitor/can/interface.py b/canopen_monitor/can/interface.py new file mode 100644 index 0000000..ca5b9ae --- /dev/null +++ b/canopen_monitor/can/interface.py @@ -0,0 +1,187 @@ +from __future__ import annotations +import psutil +import socket +import datetime as dt +from .message import Message +from pyvit.hw.socketcan import SocketCanDev + + +_SOCK_TIMEOUT = 0.1 +_STALE_INTERFACE = dt.timedelta(minutes=1) + + +class Interface(SocketCanDev): + """This is a model of a POSIX interface + + Used to manage a singular interface and any encoded messages streaming + across it + + :param name: Name of the interface bound to + :type name: str + + :param last_activity: Timestamp of the last activity on the interface + :type last_activity: datetime.datetime + """ + + def __init__(self: Interface, if_name: str): + """Interface constructor + + :param if_name: The name of the interface to bind to + :type if_name: str + """ + super().__init__(if_name) + self.name = if_name + self.last_activity = dt.datetime.now() + self.socket.settimeout(_SOCK_TIMEOUT) + self.listening = False + + def __enter__(self: Interface) -> Interface: + """The entry point of an `Interface` in a `with` statement + + This binds to the socket interface name specified. + + .. warning:: + This block-waits until the provided interface comes up before + binding to the socket. + + :returns: Itself + :rtype: Interface + + :Example: + + >>> with canopen_monitor.Interface('vcan0') as dev: + >>> print(f'Message: {dev.recv()}') + """ + self.start() + return self + + def __exit__(self: Interface, etype, evalue, traceback) -> None: + """The exit point of an `Interface` in a `with` statement + + This closes the socket previously bound to + + :param etype: The type of event + :type etype: str + + :param evalue: The event + :type evalue: str + + :param traceback: The traceback of the previously exited block + :type traceback: TracebackException + """ + self.stop() + + def start(self: Interface, block_wait: bool = True) -> None: + """A wrapper for `pyvit.hw.SocketCanDev.start()` + + If block-waiting is enabled, then instead of imediately binding to the + interface, it waits for the state to change to `UP` first before + binding. + + :param block_wait: Enables block-waiting + :type block_wait: bool + """ + while(block_wait and not self.is_up): + pass + super().start() + self.listening = True + + def stop(self: Interface) -> None: + """A wrapper for `pyvit.hw.SocketCanDev.stop()` + """ + super().stop() + self.listening = False + + def restart(self: Interface) -> None: + """A macro-fuction for restarting the interface connection + + This is the same as doing: + + >>> iface.stop() + >>> iface.start() + """ + self.stop() + self.start() + + def recv(self: Interface) -> Message: + """A wrapper for `pyvit.hw.SocketCanDev.recv()` + + Instead of returning a `can.Frame`, it intercepts the `recv()` and + converts it to a `canopen_monitor.Message` at the last minute. + + :return: A loaded `canopen_monitor.Message` from the interface if a + message is recieved within the configured SOCKET_TIMEOUT (default + is 0.3 seconds), otherwise returns None + :rtype: Message, None + """ + try: + frame = super().recv() + self.last_activity = dt.datetime.now() + return Message(frame.arb_id, + data=list(frame.data), + frame_type=frame.frame_type, + interface=self.name, + timestamp=dt.datetime.now(), + extended=frame.is_extended_id) + except OSError: + return None + except socket.timeout: + return None + + @property + def is_up(self: Interface) -> bool: + """Determines if the interface is in the `UP` state + + :returns: `True` if in the `UP` state `False` if in the `DOWN` state + :rtype: bool + """ + if_dev = psutil.net_if_stats().get(self.name) + if(if_dev is not None): + return if_dev.isup and self.age < _STALE_INTERFACE + return False + + @property + def duplex(self: Interface) -> int: + """Determines the duplex, if there is any + + :returns: Duplex value + :rtype: int + """ + val = Interface.__get_if_data(self.name) + return val.duplex if val is not None else None + + @property + def speed(self: Interface) -> int: + """Determines the Baud Rate of the bus, if any + + .. warning:: + + This will appear as `0` for virtual can interfaces. + + :return: Baud rate + :rtype: int + """ + val = Interface.__get_if_data(self.name) + return val.speed if val is not None else None + + @property + def mtu(self: Interface) -> int: + """Maximum Transmission Unit + + :return: Maximum size of a packet + :rtype: int + """ + val = Interface.__get_if_data(self.name) + return val.mtu if val is not None else None + + @property + def age(self: Interface) -> dt.timedelta: + """Deterimes the age of the message, since it was received + + :return: Age of the message + :rtype: datetime.timedelta + """ + return dt.datetime.now() - self.last_activity + + def __str__(self: Interface) -> str: + return self.name diff --git a/canopen_monitor/can/magic_can_bus.py b/canopen_monitor/can/magic_can_bus.py new file mode 100644 index 0000000..c97a941 --- /dev/null +++ b/canopen_monitor/can/magic_can_bus.py @@ -0,0 +1,126 @@ +from __future__ import annotations +from .interface import Interface +from .message import Message +import queue +import threading as t + + +class MagicCANBus: + """This is a macro-manager for multiple CAN interfaces + + :param interfaces: The list of serialized Interface objects the bus is + managing + :type interfaces: [Interface] + """ + + def __init__(self: MagicCANBus, if_names: [str], no_block: bool = False): + self.interfaces = list(map(lambda x: Interface(x), if_names)) + self.no_block = no_block + self.keep_alive = t.Event() + self.keep_alive.set() + self.message_queue = queue.SimpleQueue() + self.threads = None + + @property + def statuses(self: MagicCANBus) -> [tuple]: + """This property is simply an aggregate of all of the interfaces and + whether or not they both exist and are in the `UP` state + + :return: a list of tuples containing the interface names and a bool + indication an `UP/DOWN` status + :rtype: [tuple] + """ + return list(map(lambda x: (x.name, x.is_up), self.interfaces)) + + def start_handler(self: MagicCANBus, iface: Interface) -> t.Thread: + """This is a wrapper for starting a single interface listener thread + + .. warning:: + + If for any reason, the interface cannot be listened to, (either + it doesn't exist or there are permission issues in reading from + it), then the default behavior is to stop listening for + messages, block wait for the interface to come back up, then + resume. It is possible that a thread starts but no listener + starts due to a failure to bind to the interface. + + :param iface: The interface to bind to when listening for messages + :type iface: Interface + + :return: The new listener thread spawned + :rtype: threading.Thread + """ + tr = t.Thread(target=self.handler, + name=f'canopem-monitor-{iface.name}', + args=[iface], + daemon=True) + tr.start() + return tr + + def handler(self: MagicCANBus, iface: Interface) -> None: + """This is a handler for listening and block-waiting for messages on + the CAN bus + + It will operate on the condition that the Magic Can Bus is still + active, using thread-safe events. + + :param iface: The interface to bind to when listening for messages + :type iface: Interface + """ + iface.start() + + # The outer loop exists to enable interface recovery, if the interface + # is either deleted or goes down, the handler will try to start it + # again and read messages as soon as possible + while(self.keep_alive.is_set()): + try: + # The inner loop is the constant reading of the bus and loading + # of frames into a thread-safe queue. It is necessary to + # check `iface.is_up` in the inner loop as well, so that the + # handler will not block on bus reading if the MCB is trying + # to close all threads and destruct itself + while(self.keep_alive.is_set() and iface.is_up): + frame = iface.recv() + if(frame is not None): + self.message_queue.put(frame, block=True) + iface.restart() + except OSError: + iface.restart() + iface.stop() + + def __enter__(self: MagicCANBus) -> MagicCANBus: + self.threads = list(map(lambda x: self.start_handler(x), + self.interfaces)) + return self + + def __exit__(self: MagicCANBus, + etype: str, + evalue: str, + traceback: any) -> None: + self.keep_alive.clear() + if(self.no_block): + print('WARNING: Skipping wait-time for threads to close' + ' gracefully.') + else: + print('Press to quit without waiting.') + for tr in self.threads: + print(f'Waiting for thread {tr} to end... ', end='') + tr.join() + print('Done!') + + def __iter__(self: MagicCANBus) -> MagicCANBus: + return self + + def __next__(self: MagicCANBus) -> Message: + if(self.message_queue.empty()): + raise StopIteration + return self.message_queue.get(block=True) + + def __str__(self: MagicCANBus) -> str: + # Subtract 1 since the parent thread should not be counted + alive_threads = t.active_count() - 1 + if_list = ', '.join(list(map(lambda x: str(x), self.interfaces))) + return f"Magic Can Bus: {if_list}," \ + f" pending messages: {self.message_queue.qsize()}" \ + f" threads: {alive_threads}," \ + f" keep-alive: {self.keep_alive.is_set()}" diff --git a/canopen_monitor/can/message.py b/canopen_monitor/can/message.py new file mode 100644 index 0000000..82fcd6e --- /dev/null +++ b/canopen_monitor/can/message.py @@ -0,0 +1,216 @@ +from __future__ import annotations +import datetime as dt +from enum import Enum +from pyvit.can import Frame + +STALE_TIME = dt.timedelta(minutes=2) +DEAD_TIME = dt.timedelta(minutes=4) + + +class MessageType(Enum): + """This enumeration describes all of the ranges in the CANOpen spec that + defines specific kinds of messages. + + See `wikipedia + `_ + for details + """ + # Regular CANOpen message types + NMT = (0x0, 0x0) + SYNC = (0x1, 0x7F) + TIME = (0x100, 0x100) + EMER = (0x80, 0x0FF) + PDO1_TX = (0x180, 0x1FF) + PDO1_RX = (0x200, 0x27F) + PDO2_TX = (0x280, 0x2FF) + PDO2_RX = (0x300, 0x37F) + PDO3_TX = (0x380, 0x3FF) + PDO3_RX = (0x400, 0x47F) + PDO4_TX = (0x480, 0x4FF) + PDO4_RX = (0x500, 0x57F) + SDO_TX = (0x580, 0x5FF) + SDO_RX = (0x600, 0x680) + HEARTBEAT = (0x700, 0x7FF) + + # Special Types + UKNOWN = (-0x1, -0x1) # Pseudo type unknown + PDO = (0x180, 0x57F) # Super type PDO + SDO = (0x580, 0x680) # Super type SDO + + @property + def supertype(self: MessageType) -> MessageType: + """Determines the "Supertype" of a Message + + There are only two supertypes: MessageType.PDO and MessageType.SDO, + and they emcompass all of the PDO_T/RX and SDO_T/RX ranges + respectively. This simply returns which range the type is in if any, + or MessageType.UNKNOWN if it's in neither supertype range. + + :return: The supertype of this type + :rtype: MessageType + """ + if(self.value[0] >= self.PDO.value[0] + and self.value[0] <= self.PDO.value[1]): + return MessageType['PDO'] + elif(self.value[0] >= self.SDO.value[0] + and self.value[0] <= self.SDO.value[1]): + return MessageType['SDO'] + else: + return MessageType['UKNOWN'] + + @staticmethod + def cob_to_node(mtype: MessageType, cob_id: int) -> int: + """Determines the Node ID based on the given COB ID + + The COB ID is the raw ID sent with the CAN message, and the node id is + simply the sub-id within the COB ID, which is used as a device + identifier. + + :Example: + + If the COB ID is 0x621 + + Then the Type is SDO_RX (an SDO being received) + + The start of the SDO_RX range is 0x600 + + Therefore the Node ID is 0x621 - 0x600 = 0x21 + + :param mtype: The message type + :type mtype: MessageType + + :param cob_id: The Raw CAN Message COB ID + :type cob_id: int + + :return: The Node ID + :rtype: int + """ + return cob_id - mtype.value[0] + + @staticmethod + def cob_id_to_type(cob_id: int) -> MessageType: + """Determines the message type based on the COB ID + + :param cob_id: The Raw CAN Message COB ID + :type cob_id: int + + :return: The message type (range) the COB ID fits into + :rtype: MessageType + """ + for t in list(MessageType): + if(cob_id >= t.value[0] and cob_id <= t.value[1]): + return t + return MessageType['UKNOWN'] + + def __str__(self) -> str: + return self.name + + +class MessageState(Enum): + """This enumeration describes all possible states of a CAN Message + + +-----+----------+ + |State|Age (sec) | + +=====+==========+ + |ALIVE|x<60 | + +-----+----------+ + |STALE|60<=x<=120| + +-----+----------+ + |DEAD |120<=x | + +-----+----------+ + """ + ALIVE = 'Alive' + STALE = 'Stale' + DEAD = 'Dead' + + def __str__(self: MessageState) -> str: + """ Overloaded `str()` operator + """ + return self.value + ' ' + + +class Message(Frame): + """This class is a wrapper class for the `pyvit.can.Frame` class + + :ref: `See this for documentation on a PyVit Frame + `_ + + It's primary purpose is to carry all of the same CAN message data as a + frame, while adding age and state attributes as well. + """ + + def __init__(self: Message, arb_id: int, **kwargs): + super().__init__(arb_id, **kwargs) + self.node_name = MessageType.cob_to_node(self.type, self.arb_id) + self.message = self.data + + @property + def age(self: Message) -> dt.timedelta: + """The age of the Message since it was received from the CAN bus + + :return: Age of the message + :rtype: datetime.timedelta + """ + return dt.datetime.now() - self.timestamp + + @property + def state(self: Message) -> MessageState: + """The state of the message since it was received from the CAN bus + + :return: State of the message + :rtype: MessageState + """ + if(self.age >= DEAD_TIME): + return MessageState['DEAD'] + elif(self.age >= STALE_TIME): + return MessageState['STALE'] + else: + return MessageState['ALIVE'] + + @property + def type(self: Message) -> MessageType: + """Type of CAN Message + + :return: CAN Message Type + :rtype: MessageType + """ + return MessageType.cob_id_to_type(self.arb_id) + + @property + def supertype(self: Message) -> MessageType: + """Super-Type of CAN Message + + :return: CAN Message Super-Type + :rtype: MessageType + """ + return self.type.supertype + + @property + def node_id(self: Message) -> int: + """The Node ID, otherwise known as the unique device identifier + + This is a property that is arbitratily decided in an Object Dictionary + and can sometimes have a name attatched to it + + :Example: + + 0x621 and 0x721 are addressing the same device on the network, + because both of them share the Node ID of 0x21 + + :return: Node ID + :rtype: int + """ + return MessageType.cob_to_node(self.type, self.arb_id) + + def __lt__(self: Message, src: Message): + """Overloaded less-than operator, primarilly to support `sorted()` + on a list of `Message`, such that it's sorted by COB ID + + :param src: The right-hand message to compare against + :type src: Message + + .. example:: + + self < src + """ + return self._arb_id < src._arb_id diff --git a/canopen_monitor/can/message_table.py b/canopen_monitor/can/message_table.py new file mode 100644 index 0000000..4794460 --- /dev/null +++ b/canopen_monitor/can/message_table.py @@ -0,0 +1,47 @@ +from __future__ import annotations +from .message import Message, MessageType + + +class MessageTable: + def __init__(self: MessageTable, parser=None): + self.parser = parser + self.table = {} + + def __add__(self: MessageTable, message: Message) -> MessageTable: + if(self.parser is not None): + message.message = self.parser.parse(message) + self.table[message.arb_id] = message + return self + + def __len__(self: MessageTable) -> int: + return len(self.table) + + def filter(self: MessageTable, + types: MessageType, + start: int = 0, + end: int = None) -> [Message]: + end = len(self.table) if end is None else end + messages = list(filter(lambda x: x.type in types + or x.supertype in types, self.table.values())) + return messages[start:end] + + def __contains__(self: MessageTable, node_id: int) -> bool: + return node_id in self.table + + def __iter__(self: MessageTable) -> MessageTable: + self.__keys = sorted(list(self.table.keys())) + return self + + def __next__(self: MessageTable) -> Message: + if(self.__start == self.__stop): + raise StopIteration() + message = self.table[self.__keys[self.__start]] + self.__start += 1 + return message + + def __call__(self: MessageTable, + start: int, + stop: int = None) -> MessageTable: + self.__stop = stop if stop < len(self.table) else len(self.table) + self.__start = start if start < self.__stop else self.__stop + return self diff --git a/canopen_monitor/canmsgs/__init__.py b/canopen_monitor/canmsgs/__init__.py deleted file mode 100755 index 35d60af..0000000 --- a/canopen_monitor/canmsgs/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -from .canmsg import CANMsg, MessageType -from .canmsg_table import CANMsgTable -from .magic_can_bus import MagicCANBus - -__all__ = [ - 'CANMsg', - 'CANMsgTable', - 'MessageType', - 'MagicCANBus', -] diff --git a/canopen_monitor/canmsgs/canmsg.py b/canopen_monitor/canmsgs/canmsg.py deleted file mode 100755 index ac1424e..0000000 --- a/canopen_monitor/canmsgs/canmsg.py +++ /dev/null @@ -1,189 +0,0 @@ -import datetime -from enum import Enum -import pyvit.can as pc - -node_ranges = [(0x0, 0x0), # NMT - (0x1, 0x7F), # SYNC - (0x100, 0x100), # TIME - (0x80, 0x0FF), # EMER - (0x180, 0x1FF), # PDO1_TX - (0x200, 0x27F), # PDO1_RX - (0x280, 0x2FF), # PDO2_TX - (0x300, 0x37F), # PDO2_RX - (0x380, 0x3FF), # PDO3_TX - (0x400, 0x47F), # PDO3_RX - (0x480, 0x4FF), # PDO4_TX - (0x500, 0x57F), # PDO4_RX - (0x580, 0x5FF), # SDO_TX - (0x600, 0x680), # SDO_RX - (0x700, 0x7FF)] # HEARTBEAT - - -class MessageType(Enum): - NMT = 0 - SYNC = 1 - TIME = 2 - EMER = 3 - PDO1_TX = 4 - PDO1_RX = 5 - PDO2_TX = 6 - PDO2_RX = 7 - PDO3_TX = 8 - PDO3_RX = 9 - PDO4_TX = 10 - PDO4_RX = 11 - SDO_TX = 12 - SDO_RX = 13 - HEARTBEAT = 14 - UKNOWN = 15 - - PDO = 16 # Super type PDO - SDO = 17 # Super type SDO - - def super_type(self): - if(self.value >= 4 and self.value <= 11): - return MessageType(16) - elif(self.value >= 12 and self.value <= 13): - return MessageType(17) - else: - return MessageType(self.value) - - def cob_id_to_type(cob_id: int): - """ - A static function for turning a COB ID into a MessageType. - - Arguments - --------- - cob_id `int`: The COB ID of the message. - - Returns - ------- - `MessageType`: The message type of the the message based on the COB ID. - """ - # Determine a node type the cob id fits into - # and return the matching type - for i, range in enumerate(node_ranges): - if(cob_id >= range[0] and cob_id <= range[1]): - return MessageType(i) - - # If the cob id matched no range then return the unknown type - return MessageType(15) - - def cob_id_to_node_id(cob_id: int) -> int: - """ - A static function for turning a COB ID into a Node ID. - - Arguments - --------- - cob_id `int`: The COB ID of the message. - - Returns - ------- - `int`: The Node ID of the message. - """ - # Determine a node type the cob id fits into and return the node id - for range in node_ranges: - if(cob_id >= range[0] and cob_id <= range[1]): - return cob_id - range[0] - - # If the cob id matched no range then return None - return None - - def __str__(self) -> str: - return self.name - - -class CANMsg(pc.Frame): - """ - Models a raw CANopen Message recieved from the CAN Bus - """ - - def __init__(self, - src: pc.Frame, - interface: str, - stale_timeout: int, - dead_timeout: int): - """ - CANMsg Frame initialization.abs($0) - - Arguments - ---------- - - src `pyvit.can.Frame`: The raw Frame read off of the CAN bus. - interface `str`: The name of the interface that src was read from. - """ - super().__init__(src.arb_id, - data=src.data, - frame_type=src.frame_type, - interface=interface, - timestamp=datetime.datetime.now(), - extended=src.is_extended_id) - self.message_type = MessageType.cob_id_to_type(src.arb_id) - self.stale_timeout = stale_timeout - self.dead_timeout = dead_timeout - node_name = MessageType.cob_id_to_node_id(src.arb_id) - self.node_name = hex(node_name) \ - if node_name is not None else hex(src.arb_id) - self.parsed_msg = "" - - def __str__(self): - """ - Overloaded str opeartor. - - Returns - ------- - `str`: A string representation of a CANMsg. - """ - attrs = [] - for k, v in self.__dict__.items(): - attrs += ['{}={}'.format(k, v)] - return "".format(self.message_type, - self.arb_id, - self.status) - - def __le__(self, operand) -> bool: - """ - Arguments - ---------- - operand `CANMsg`: The CAN message to comare this object against. - - Returns - ------- - `bool`: An indication of whether or not this object has a lesser or - equal COB ID than the specified operand. - """ - return self.arb_id <= operand.arb_id - - @property - def status(self) -> str: - """ - Returns - ------- - `str`: A string indication of the CAN message's current status. - """ - if(self._is_dead()): - return 'DEAD' - elif(self._is_stale()): - return 'STALE' - else: - return 'ALIVE' - - def _is_stale(self) -> bool: - """ - Returns - ------- - `bool`: An indication of whether or not this message is older than the - configured stale timeout time. - """ - return (datetime.datetime.now() - self.timestamp) \ - .total_seconds() >= self.stale_timeout - - def _is_dead(self) -> bool: - """ - Returns - ------- - `bool`: An indication of whether or not this message is older than the - configured dead timeout time. - """ - return (datetime.datetime.now() - self.timestamp) \ - .total_seconds() >= self.dead_timeout diff --git a/canopen_monitor/canmsgs/canmsg_table.py b/canopen_monitor/canmsgs/canmsg_table.py deleted file mode 100755 index b0825b9..0000000 --- a/canopen_monitor/canmsgs/canmsg_table.py +++ /dev/null @@ -1,99 +0,0 @@ -from __future__ import annotations -from .canmsg import CANMsg -import typing -from collections.abc import Iterable, Iterator - - -class CANMsgTable(Iterable): - """ - Table of CAN Messages - """ - - def __init__(self: CANMsgTable, capacity: int = None): - self.__message_table = {} - self.capacity = capacity - - def __sort__(self: CANMsgTable) -> [int]: - """ - Overloaded sort function - Sort by COB-ID - - Returns - -------- - [int]: List of keys sorted - """ - return sorted(self.__message_table.keys()) - - def __add__(self: CANMsgTable, frame: CANMsg) -> CANMsgTable: - """ - Overloaded add operator - allows for following: - CANMsgTable += CANMsg - - Arguments - ---------- - frame: CANMsg to be added - - Returns - -------- - CANMsgTable: returns self after adding message - """ - if self.capacity is not None: - if (len(self.__message_table) < self.capacity - or (self.__message_table.get(frame.arb_id) is not None)): - self.__message_table[frame.arb_id] = frame - else: - self.__message_table[frame.arb_id] = frame - - return self - - def __len__(self: CANMsgTable) -> int: - """ - Overloaded len function - - Returns - -------- - int: Number of CANMsg records in table - """ - return len(self.__message_table) - - def __str__(self: CANMsgTable) -> str: - """ - Overloaded str function - - Returns - -------- - str: String representation of CANMsgTable - """ - attrs = [] - for k, v in self.__dict__.items(): - attrs += ['{}={}'.format(k, v)] - return 'CANMsgTable {}\n\n'.format(', '.join(attrs)) - - def __getitem__(self: CANMsgTable, key: typing.Union[int, str]) -> \ - typing.Union[CANMsg, None]: - """ - Overloaded getitem operator - Example: CANMsgTable[0x40] - - Arguments - ---------- - key: int or string representation of node COB-ID - - Returns - -------- - CANMsg: last message added for the provided COB-ID - None: None will be returned if no messages exist for provided COB-ID - """ - sub_key = int(key, 16) if type(key) is str else key - return self.__message_table.get(sub_key) - - def __iter__(self: CANMsgTable) -> Iterator[CANMsg]: - """ - Overloaded iter function - - Returns - -------- - Iterator[CANMsg]: iterator for contained messages - """ - return self.__message_table.__iter__() diff --git a/canopen_monitor/canmsgs/magic_can_bus.py b/canopen_monitor/canmsgs/magic_can_bus.py deleted file mode 100755 index 3550f05..0000000 --- a/canopen_monitor/canmsgs/magic_can_bus.py +++ /dev/null @@ -1,130 +0,0 @@ -import queue as q -import threading as t -from typing import Union -import pyvit.hw.socketcan as phs -from .. import DEBUG, TIMEOUT, canmsgs - - -class MagicCANBus: - """ - A magic bus-sniffer that reads activity on the CAN bus and subsequently - loads a thread-safe queue of CANMsg's - - Parameters - ---------- - interfaces `[pyvit.bus.Bus]`: A list of Bus objects that the Magic CAN Bus - will monitor. - - frames `queue.Queue`: The thread-safe queue of CANMsg objects to pull from. - - failed_interfaces `[str]`: A list of interface names that the Magic CAN Bus - failed to connect to. - - stop_listening `threading.Event`: A thread-safe event that triggers when - it's time to shut down all of the bus listeners. - - block `bool`: A flag for determining whether or not the Magic CAN Bus - should block when checking for CAN messages. - - threads `[threading.Thread]`: A list of bus-listener worker threads. - """ - - def __init__(self, - interface_names: [str] = [], - block: bool = True, - stale_timeout: int = 60, - dead_timeout: int = 120): - """ - Magic CAN Bus initialization. - - Arguments - --------- - interface_names `[str]`: A list of interfaces to try and connect to. - block `bool`: A flag for determining whether or not the Magic CAN Bus - should block when checking for CAN messages. - """ - # Bus things - self.interfaces = [] - self.frames = q.Queue() - self.failed_interfaces = [] - self.stale_timeout = stale_timeout - self.dead_timeout = dead_timeout - - # Threading things - self.stop_listening = t.Event() - self.block = block - self.threads = [] - - # Start all of the devices specified - for name in interface_names: - self.start(name) - - def start(self, dev_name): - try: - dev = phs.SocketCanDev(dev_name) - dev.start() - self.interfaces.append(dev) - dev_listener = t.Thread(target=self._listen, - args=[dev]) - dev_listener.setDaemon(True) - dev_listener.start() - self.threads.append(dev_listener) - except OSError: - self.failed_interfaces.append(dev_name) - - def _stop(self, dev): - self.interfaces.remove(dev) - - def stop_all(self) -> None: - """ - Remove all devices from the device table - """ - for dev in self.interfaces: - self._stop(dev) - - self.stop_listening.set() - - if(DEBUG): - print('waiting for ' - + str(len(self.threads)) - + ' bus-threads to close.') - if(len(self.threads) > 0): - for thread in self.threads: - thread.join(TIMEOUT) - if(thread.is_alive() and DEBUG): - print('the bus thread listener with pid ({}) took too long' - ' to close, will try again in {}s!' - .format(thread.native_id, - round(TIMEOUT * len(self.threads), 3))) - if(DEBUG): - print('all bus threads closed gracefully!') - else: - if(DEBUG): - print('no child bus threads were spawned!') - - def _listen(self, dev: phs.SocketCanDev) -> None: - try: - while not self.stop_listening.is_set(): - self.frames.put([dev.recv(), dev.ndev], block=self.block) - except q.Full: - pass - except OSError: - self._stop(dev) - - def receive(self) -> Union[canmsgs.CANMsg, None]: - """ - Returns the first available CANMsg retrieved from the bus if any. - If no messages are available on the bus, None is returned - """ - try: - res = self.frames.get(block=self.block, - timeout=TIMEOUT) - return canmsgs.CANMsg(res[0], - res[1], - self.stale_timeout, - self.dead_timeout) - except q.Empty: - return None - - def running(self) -> [str]: - return list(filter(lambda x: x.running, self.interfaces)) diff --git a/canopen_monitor/monitor_app.py b/canopen_monitor/monitor_app.py deleted file mode 100755 index f62321f..0000000 --- a/canopen_monitor/monitor_app.py +++ /dev/null @@ -1,223 +0,0 @@ -import time -import curses -import threading -import canopen_monitor -from .ui.pane import CANMsgPane -from .ui.windows import PopupWindow -from .ui.grid import Grid, Split -from .parser.canopen import CANOpenParser -from .canmsgs.magic_can_bus import MagicCANBus - - -class MonitorApp: - """The top-level application of Can Monitor that manages the middleware - resoruces and the UI elements. - """ - - def __init__(self, - devices: [str], - timeouts: tuple, - table_schema: dict, - eds_configs: dict): - # Monitor setup - self.screen = curses.initscr() # Initialize standard out - self.screen.scrollok(True) # Enable window scroll - self.screen.keypad(True) # Enable special key input - self.screen.nodelay(True) # Disable user-input blocking - - # Bus things - self.devices = devices - self.bus = MagicCANBus(interface_names=self.devices, - stale_timeout=timeouts[0], - dead_timeout=timeouts[1]) - self.parser = CANOpenParser(eds_configs) - - # panel selection things - self.panel_index = 0 # Index to get to selected panel - self.panel_flatlist = [] # List of all Panes contained in parent - self.selected = None # Reference to currently selected pane - - # Threading things - self.screen_lock = threading.Lock() - self.stop_listening = threading.Event() - - # Curses configuration - curses.savetty() # Save the terminal state - curses.raw() # Enable raw input (DISABLES SIGNALS) - curses.noecho() # Disable user-input echo - curses.cbreak() # Disable line-buffering (less input delay) - curses.curs_set(False) # Disable the cursor display - - # Curses colors - curses.start_color() - curses.init_pair(1, curses.COLOR_GREEN, curses.COLOR_BLACK) - curses.init_pair(2, curses.COLOR_YELLOW, curses.COLOR_BLACK) - curses.init_pair(3, curses.COLOR_RED, curses.COLOR_BLACK) - curses.init_pair(4, curses.COLOR_CYAN, curses.COLOR_BLACK) - curses.init_pair(5, curses.COLOR_MAGENTA, curses.COLOR_BLACK) - - # Construct the grid(s) - self.construct_grid(table_schema) - - def start(self): - while not self.stop_listening.is_set(): - # Get CanBus input - data = self.bus.receive() - if(data is not None): - self.parent.add_frame(data) - - # Get user input - self.read_input() - - # Draw the screen - self.screen_lock.acquire() - self.draw_banner() - self.parent.draw() - self.screen_lock.release() - - def stop(self): - curses.nocbreak() # Re-enable line-buffering - curses.echo() # Enable user-input echo - curses.curs_set(True) # Enable the cursor - curses.resetty() # Restore the terminal state - curses.endwin() # Destroy the virtual screen - self.stop_listening.set() # Signal the bus threads to stop - - if(canopen_monitor.DEBUG): # Extra canopen_monitor.DEBUG info - print('stopping bus-listeners from the app-layer...') - - self.bus.stop_all() # Wait for all CanBus threads to stop - - if(canopen_monitor.DEBUG): # Extra canopen_monitor.DEBUG info - print('stopped all bus-listeners!') - - threads = threading.enumerate().remove(threading.current_thread()) - if(canopen_monitor.DEBUG): # Extra canopen_monitor.DEBUG info - print('waiting for all app-threads to close...') - - # If app-layer threads exist wait for them to close - if(threads is not None): - for thread in threads: - thread.join() - if(canopen_monitor.DEBUG): # Extra canopen_monitor.DEBUG info - print('stopped all app-threads gracefully!') - - elif(canopen_monitor.DEBUG): # Extra canopen_monitor.DEBUG info - print('no child app-threads were spawned!') - - def read_input(self): - # Grab new user input and immediately flush the buffer - key = self.screen.getch() - curses.flushinp() - - # Determine the key input - if(key == curses.KEY_RESIZE): - self.screen.clear() - self.parent.clear() - self.parent.resize(self.screen) - elif(key == curses.KEY_F1): - window_message = '\n'.join([canopen_monitor.APP_DESCRIPTION, - 'Author: ' + canopen_monitor.APP_AUTHOR, - 'Email: ' + canopen_monitor.APP_EMAIL, - 'License: ' + canopen_monitor.APP_LICENSE, - 'Version: ' + canopen_monitor.APP_VERSION, - 'Website: ' + canopen_monitor.APP_URL]) - PopupWindow(self.screen, - window_message, - banner='About ' + canopen_monitor.APP_NAME, - color_pair=1) - elif(key == curses.KEY_F2): - PopupWindow(self.screen, ": Exit program\ - \n\nInfo:\ - \n\t: App Info\ - \n\t: Controls\ - \n\nMovement:\ - \n\t: Scroll up\ - \n\t: Scroll down\ - \n\t: Fast scroll up\ - \n\t: Fast scroll down\ - \n\t: Select previous pane\ - \n\t: Select next pane", - banner='Controls', - color_pair=1) - elif((key == curses.KEY_SR or key == curses.KEY_SLEFT) - and self.panel_index > 0): - self.panel_index -= 1 - self.update_selected_panel() - elif((key == curses.KEY_SF or key == curses.KEY_SRIGHT) - and self.panel_index < len(self.panel_flatlist) - 1): - self.panel_index += 1 - self.update_selected_panel() - elif(key == curses.KEY_UP): - self.selected.scroll_up() - elif(key == curses.KEY_DOWN): - self.selected.scroll_down() - elif(key == 567 or key == 546): # Ctrl+Up or Ctrl+Left - self.selected.scroll_up(rate=10) - elif(key == 526 or key == 561): # Ctrl+Down or Ctrl+Right - self.selected.scroll_down(rate=10) - elif(key == curses.KEY_LEFT): - self.selected.scroll_left() - elif(key == curses.KEY_RIGHT): - self.selected.scroll_right() - - def draw_banner(self): - _, width = self.screen.getmaxyx() - self.screen.addstr(0, 0, time.ctime(), curses.color_pair(0)) - self.screen.addstr(" | ") - - running = list(map(lambda x: x.ndev, self.bus.running())) - for dev in self.devices: - if(dev in running): - color = 1 - else: - color = 3 - - self.screen.addstr(dev + " ", curses.color_pair(color)) - - hottip = ' ' - self.screen.addstr(0, width - len(hottip), hottip) - - def update_selected_panel(self): - if(self.selected is not None): - self.selected.selected = False - self.selected = self.panel_flatlist[self.panel_index] - self.selected.selected = True - - def construct_grid(self, schema, parent=None): - # Required attributes - type = schema.get('type') - split = schema.get('split') - data = schema.get('data') - split = {'horizontal': Split.HORIZONTAL, - 'vertical': Split.VERTICAL}.get(split) - - # Optional attributes - name = schema.get('name') - fields = schema.get('fields') - capacity = schema.get('capacity') - frame_types = schema.get('frame_types') - - if(parent is None): - self.parent = Grid(parent=self.screen, split=split) - - for entry in data: - self.construct_grid(entry, self.parent) - self.panel_flatlist = self.parent.flatten() - self.update_selected_panel() - else: - if(type == 'grid'): - component = Grid(split=split) - - for entry in data: - self.construct_grid(entry, component) - elif(type == 'message_table'): - component = CANMsgPane(name, - self.parser, - capacity=capacity, - fields=fields, - frame_types=frame_types) - else: - raise ValueError('Failed to parse layout! Invalid table type: {}' - .format(type)) - parent.add_panel(component) diff --git a/canopen_monitor/parser/__init__.py b/canopen_monitor/parse/__init__.py old mode 100755 new mode 100644 similarity index 90% rename from canopen_monitor/parser/__init__.py rename to canopen_monitor/parse/__init__.py index bba9ce1..e8027a3 --- a/canopen_monitor/parser/__init__.py +++ b/canopen_monitor/parse/__init__.py @@ -1,10 +1,15 @@ +"""This module is primarily responsible for providing a high-level interface +for parsing CANOpen messages according to Ojbect Definiton files or Electronic +Data Sheet files, provided by the end user. +""" import enum from re import finditer -from .eds import load_eds_file +from .eds import EDS, load_eds_file from .canopen import CANOpenParser __all__ = [ 'CANOpenParser', + 'EDS', 'load_eds_file', ] diff --git a/canopen_monitor/parse/canopen.py b/canopen_monitor/parse/canopen.py new file mode 100644 index 0000000..8d6177c --- /dev/null +++ b/canopen_monitor/parse/canopen.py @@ -0,0 +1,45 @@ +from ..can import Message, MessageType +from . import hb as HBParser, \ + pdo as PDOParser, \ + sync as SYNCParser, \ + emcy as EMCYParser, \ + time as TIMEParser +from .sdo import SDOParser +from .utilities import FailedValidationError + + +class CANOpenParser: + def __init__(self, eds_configs: dict): + self.sdo_parser = SDOParser() + self.eds_configs = eds_configs + + def parse(self, message: Message) -> str: + node_id = message.node_id + eds_config = self.eds_configs.get(hex(node_id)) \ + if node_id is not None else None + + if (message.type == MessageType.SYNC): + parse = SYNCParser.parse + elif (message.type == MessageType.EMER): + parse = EMCYParser.parse + elif (message.supertype == MessageType.PDO): + parse = PDOParser.parse + elif (message.supertype == MessageType.SDO): + if self.sdo_parser.is_complete: + self.sdo_parser = SDOParser() + parse = self.sdo_parser.parse + elif (message.type == MessageType.HEARTBEAT): + parse = HBParser.parse + elif (message.type == MessageType.TIME): + parse = TIMEParser.parse + else: + parse = None + + try: + parsed_message = parse(message.arb_id, message.data, eds_config) + except (FailedValidationError, TypeError): + parsed_message = ' '.join(list(map(lambda x: hex(x)[2:] + .upper() + .rjust(2, '0'), + message.data))) + return parsed_message diff --git a/canopen_monitor/parser/eds.py b/canopen_monitor/parse/eds.py old mode 100755 new mode 100644 similarity index 81% rename from canopen_monitor/parser/eds.py rename to canopen_monitor/parse/eds.py index 54689b0..92dad30 --- a/canopen_monitor/parser/eds.py +++ b/canopen_monitor/parse/eds.py @@ -1,8 +1,6 @@ -"""EDS File Parser Interface""" import string from typing import Union - -import canopen_monitor.parser as cmp +import canopen_monitor.parse as cmp from dateutil.parser import parse as dtparse @@ -79,12 +77,11 @@ def __len__(self) -> int: class EDS: def __init__(self, eds_data: [str]): - """ - Parse the array of EDS lines into a dictionary of Metadata/Index objects. + """Parse the array of EDS lines into a dictionary of Metadata/Index + objects. - Parameters - ---------- - eds_data: `[str]` The list of raw lines from the EDS file. + :param eds_data: The list of raw lines from the EDS file. + :type eds_data: [str] """ self.indices = {} @@ -98,32 +95,33 @@ def __init__(self, eds_data: [str]): if len(id) == 1: self.indices[hex(int(id[0], 16))] = Index(section[1:]) else: - self.indices[hex(int(id[0], 16))].add(Index(section[1:], sub_id=int(id[1], 16))) + self.indices[hex(int(id[0], 16))] \ + .add(Index(section[1:], sub_id=int(id[1], 16))) else: name = section[0][1:-1] self.__setattr__(cmp.camel_to_snake(name), Metadata(section[1:])) prev = i + 1 + self.node_id = self[0x2101].default_value def __len__(self) -> int: return sum(map(lambda x: len(x), self.indices.values())) def __getitem__(self, key: Union[int, str]) -> Index: - return self.indices.get(hex(int(str(key), 16))) + callable = hex if type(key) == int else str + return self.indices.get(callable(key)) def load_eds_file(filepath: str) -> EDS: - """ - Read in the EDS file, grab the raw lines, strip them of all escaped - characters, then serialize into an `EDS` and return the resulpythting object. + """Read in the EDS file, grab the raw lines, strip them of all escaped + characters, then serialize into an `EDS` and return the resulpythting + object. - Parameters - ---------- - filepath: `str` Path to an eds file. + :param filepath: Path to an eds file + :type filepath: str - Returns - ------- - `EDS`: The succesfully serialized EDS file. + :return: The succesfully serialized EDS file. + :rtype: EDS """ with open(filepath) as file: return EDS(list(map(lambda x: x.strip(), file.read().split('\n')))) diff --git a/canopen_monitor/parser/emcy.py b/canopen_monitor/parse/emcy.py old mode 100755 new mode 100644 similarity index 96% rename from canopen_monitor/parser/emcy.py rename to canopen_monitor/parse/emcy.py index c5bc58e..011ab61 --- a/canopen_monitor/parser/emcy.py +++ b/canopen_monitor/parse/emcy.py @@ -1,5 +1,5 @@ -from canopen_monitor.parser.eds import EDS -from canopen_monitor.parser.utilities import FailedValidationError +from .eds import EDS +from .utilities import FailedValidationError def parse(cob_id: int, data: list, eds: EDS): diff --git a/canopen_monitor/parser/hb.py b/canopen_monitor/parse/hb.py old mode 100755 new mode 100644 similarity index 87% rename from canopen_monitor/parser/hb.py rename to canopen_monitor/parse/hb.py index 890fbe3..905c2ed --- a/canopen_monitor/parser/hb.py +++ b/canopen_monitor/parse/hb.py @@ -1,6 +1,6 @@ from .eds import EDS from .utilities import FailedValidationError -from ..canmsgs import MessageType +from ..can import MessageType def parse(cob_id: int, data: list, eds_config: EDS): @@ -21,7 +21,7 @@ def parse(cob_id: int, data: list, eds_config: EDS): 0x05: "Operational", 0x7F: "Pre-operational" } - node_id = MessageType.cob_id_to_node_id(cob_id) + node_id = MessageType.cob_to_node(MessageType.HEARTBEAT, cob_id) if len(data) < 1 or data[0] not in states: raise FailedValidationError(data, node_id, cob_id, __name__, "Invalid heartbeat state detected") diff --git a/canopen_monitor/parser/pdo.py b/canopen_monitor/parse/pdo.py old mode 100755 new mode 100644 similarity index 100% rename from canopen_monitor/parser/pdo.py rename to canopen_monitor/parse/pdo.py diff --git a/canopen_monitor/parser/sdo.py b/canopen_monitor/parse/sdo.py old mode 100755 new mode 100644 similarity index 100% rename from canopen_monitor/parser/sdo.py rename to canopen_monitor/parse/sdo.py diff --git a/canopen_monitor/parser/sync.py b/canopen_monitor/parse/sync.py old mode 100755 new mode 100644 similarity index 100% rename from canopen_monitor/parser/sync.py rename to canopen_monitor/parse/sync.py diff --git a/canopen_monitor/parser/time.py b/canopen_monitor/parse/time.py similarity index 100% rename from canopen_monitor/parser/time.py rename to canopen_monitor/parse/time.py diff --git a/canopen_monitor/parser/utilities.py b/canopen_monitor/parse/utilities.py old mode 100755 new mode 100644 similarity index 98% rename from canopen_monitor/parser/utilities.py rename to canopen_monitor/parse/utilities.py index 35d76ef..bdf5fe9 --- a/canopen_monitor/parser/utilities.py +++ b/canopen_monitor/parse/utilities.py @@ -1,7 +1,7 @@ import array import datetime from struct import unpack -from canopen_monitor.parser.eds import EDS +from .eds import EDS from typing import List diff --git a/canopen_monitor/parser/canopen.py b/canopen_monitor/parser/canopen.py deleted file mode 100755 index f3e595b..0000000 --- a/canopen_monitor/parser/canopen.py +++ /dev/null @@ -1,48 +0,0 @@ -from ..canmsgs import CANMsg, MessageType -from . import hb as HBParser, \ - pdo as PDOParser, \ - sync as SYNCParser, \ - emcy as EMCYParser, \ - time as TIMEParser -from .sdo import SDOParser -from .utilities import FailedValidationError - - -class CANOpenParser: - def __init__(self, eds_configs: dict): - self.sdo_parser = SDOParser() - self.eds_configs = eds_configs - - def parse(self, msg: CANMsg) -> [str, str]: - node_id = MessageType.cob_id_to_node_id(msg.arb_id) - eds_config = self.eds_configs.get(hex(node_id)) \ - if node_id is not None else None - - if (eds_config is not None): - msg.node_name = eds_config.device_info.product_name - - if (msg.message_type == MessageType.UKNOWN): - return [str(msg.message_type), str(hex(msg.arb_id))] - elif (msg.message_type == MessageType.SYNC): - parse = SYNCParser.parse - elif (msg.message_type == MessageType.EMER): - parse = EMCYParser.parse - elif (msg.message_type.super_type() == MessageType.PDO): - parse = PDOParser.parse - elif (msg.message_type.super_type() == MessageType.SDO): - if self.sdo_parser.is_complete: - self.sdo_parser = SDOParser() - parse = self.sdo_parser.parse - elif (msg.message_type == MessageType.HEARTBEAT): - parse = HBParser.parse - elif (msg.message_type == MessageType.TIME): - parse = TIMEParser.parse - else: - return ["Unknown", str(hex(msg.arb_id))] - - try: - message = parse(msg.arb_id, msg.data, eds_config) - except FailedValidationError: - message = str(list(map(lambda x: hex(x), msg.data))) - - return [message, msg.node_name] diff --git a/canopen_monitor/ui/__init__.py b/canopen_monitor/ui/__init__.py index e69de29..266c6f5 100755 --- a/canopen_monitor/ui/__init__.py +++ b/canopen_monitor/ui/__init__.py @@ -0,0 +1,12 @@ +"""This module is responsible for providing a high-level interface for elements +of Curses UI and general user interaction with the app, +""" +from .pane import Pane +from .windows import PopupWindow +from .message_pane import MessagePane + +__all__ = [ + "Pane", + "MessagePane", + "PopupWindow" +] diff --git a/canopen_monitor/ui/message_pane.py b/canopen_monitor/ui/message_pane.py new file mode 100644 index 0000000..422082d --- /dev/null +++ b/canopen_monitor/ui/message_pane.py @@ -0,0 +1,213 @@ +from __future__ import annotations +from .pane import Pane +from ..can import Message, MessageType, MessageTable +import curses + + +class MessagePane(Pane): + """A derivative of Pane customized specifically to list miscellaneous CAN + messages stored in a MessageTable + + :param name: The name of the pane (to be printed in the top left) + :type name: str + + :param cols: A dictionary describing the pane layout. The key is the Pane + collumn name, the value is a tuple containing the Message attribute to + map the collumn to, and the max collumn width respectively. + :type cols: dict + + :param selected: An indicator that the current Pane is selected + :type selected: bool + + :param table: The message table + :type table: MessageTable + """ + + def __init__(self: MessagePane, + cols: dict, + types: [MessageType], + name: str = '', + parent: any = None, + height: int = 1, + width: int = 1, + y: int = 0, + x: int = 0, + message_table: MessageTable = MessageTable()): + super().__init__(parent=(parent or curses.newpad(0, 0)), + height=height, + width=width, + y=y, + x=x) + + # Pane details + self._name = name + self.cols = cols + self.types = types + self.__top = 0 + self.__top_max = 0 + self.__col_sep = 2 + self.__header_style = curses.color_pair(4) + self.table = message_table + + # Cursor stuff + self.cursor = 0 + self.cursor_min = 0 + self.cursor_max = self.d_height - 10 + + # Reset the collumn widths to the minimum size of the collumn names + self.__reset_col_widths() + + def resize(self: MessagePane, height: int, width: int) -> None: + """A wrapper for `Pane.resize()`. This intercepts a call for a resize + in order to upate MessagePane-specific details that change on a resize + event. The parent `resize()` gets called first and then MessagePane's + details are updated. + + :param height: New virtual height + :type height: int + + :param width: New virtual width + :type width: int + """ + super().resize(height, width) + p_height = self.d_height - 3 + table_size = len(self.table.filter(self.types)) + occluded = table_size - self.__top - self.d_height + 3 + + self.cursor_max = table_size if table_size < p_height else p_height + self.__top_max = occluded if occluded > 0 else 0 + + def _reset_scroll_positions(self: MessagePane) -> None: + self.cursor = self.cursor_max + self.scroll_position_y = 0 + self.scroll_position_x = 0 + + @property + def scroll_limit_y(self: MessagePane) -> int: + """The maximim rows the pad is allowed to shift by when scrolling + """ + return self.d_height - 2 + + @property + def scroll_limit_x(self: MessagePane) -> int: + """The maximim columns the pad is allowed to shift by when scrolling + """ + max_length = sum(list(map(lambda x: x[1], self.cols.values()))) + occluded = max_length - self.d_width + 7 + return occluded if(occluded > 0) else 0 + + def scroll_up(self: MessagePane, rate: int = 1) -> None: + """This overrides `Pane.scroll_up()`. Instead of shifting the + pad vertically, the slice of messages from the `MessageTable` is + shifted. + + :param rate: Number of messages to scroll by + :type rate: int + """ + # Record current cursor info for later scroll calculations + prev = self.cursor + min = 0 + + # Move the cursor + self.cursor -= rate + + # If the cursor is less than the minimum, reset it to the minimum then + # do calculations for shifting the message table + if(self.cursor < self.cursor_min): + self.cursor = self.cursor_min + + # Deduct the amount of cursor movement from the message table + # movement and reset shift to bounds if need be + leftover = rate - prev + self.__top -= leftover + self.__top = min if(self.__top < min) else self.__top + + def scroll_down(self: MessagePane, rate: int = 1) -> None: + """This overrides `Pane.scroll_up()`. Instead of shifting the + pad vertically, the slice of messages from the `MessageTable` is + shifted. + + :param rate: Number of messages to scroll by + :type rate: int + """ + # Record current cursor info for later scroll calculations + prev = self.cursor + max = self.__top + self.__top_max + + # Move the cursor + self.cursor += rate + + # If the cursor is greater than the maximum, reset it to the minimum + # then do calculations for shifting the message table + if(self.cursor > (self.cursor_max - 1)): + self.cursor = self.cursor_max - 1 + + # Deduct the amount of cursor movement from the message table + # movement and reset shift to bounds if need be + leftover = rate - (self.cursor - prev) + self.__top += leftover + self.__top = max if(self.__top > max) else self.__top + + def __draw_header(self: Pane) -> None: + """Draw the table header at the top of the Pane + + This uses the `cols` dictionary to determine what to write + """ + self.add_line(0, + 2, + f'{self._name}:' + f' ({len(self.table.filter(self.types))} messages)', + highlight=self.selected) + + pos = 1 + for name, data in self.cols.items(): + self.add_line(1, + pos, + f'{name}:'.ljust(data[1] + self.__col_sep, ' '), + highlight=True, + color=curses.color_pair(4)) + pos += data[1] + self.__col_sep + + def draw(self: MessagePane) -> None: + """Draw all records from the MessageTable to the Pane + """ + super().draw() + self.resize(self.v_height, self.v_width) + + # Get the messages to be displayed based on scroll positioning, + # and adjust column widths accordingly + draw_messages = self.table.filter(self.types, + self.__top, + self.__top + self.d_height - 3) + self.__check_col_widths(draw_messages) + + # Draw the header and messages + self.__draw_header() + for i, message in enumerate(draw_messages): + pos = 1 + for name, data in self.cols.items(): + attr = getattr(message, data[0]) + callable = data[2] if (len(data) == 3) else str + self.add_line(2 + i, + pos, + callable(attr).ljust(data[1] + self.__col_sep, + ' '), + highlight=((self.cursor == i) and self.selected)) + pos += data[1] + self.__col_sep + + # Refresh the Pane and end the draw cycle + super().refresh() + + def __reset_col_widths(self: Message): + for name, data in self.cols.items(): + self.cols[name] = (data[0], len(name), data[2]) \ + if (len(data) == 3) else (data[0], len(name)) + + def __check_col_widths(self: MessagePane, messages: [Message]) -> None: + for message in messages: + for name, data in self.cols.items(): + attr = getattr(message, data[0]) + attr_len = len(str(attr)) + if(data[1] < attr_len): + self.cols[name] = (data[0], attr_len) + super().clear() diff --git a/canopen_monitor/ui/pane.py b/canopen_monitor/ui/pane.py index 4586d39..222d832 100755 --- a/canopen_monitor/ui/pane.py +++ b/canopen_monitor/ui/pane.py @@ -1,94 +1,168 @@ from __future__ import annotations -from ..canmsgs import CANMsgTable, CANMsg, MessageType -from ..parser import CANOpenParser import curses from abc import ABC, abstractmethod class Pane(ABC): - """ - Abstract Pane Class, contains a PAD and a window. - """ + """Abstract Pane Class, contains a PAD and a window - def __init__(self: Pane, border: bool = True, color_pair: int = 4): - """ - Abstract pane initialization + :param v_height: The virtual height of the embedded pad + :type v_height: int + + :param v_width: The virtual width of the embedded pad + :type v_width: int - Arguments - ---------- + :param d_height: The drawn height of the embedded pad + :type d_height: int + + :param d_width: The drawn width of the embedded pad + :type d_width: int + + :param border: A style option for drawing a border around the pane + :type border: bool + """ - border: boolean definition of whether or not to display border - color_pair: curses color pair to use (must be implemented prior) + def __init__(self: Pane, + parent: any = None, + height: int = 1, + width: int = 1, + y: int = 0, + x: int = 0, + border: bool = True, + color_pair: int = 0): + """Abstract pane initialization + + :param border: Toggiling whether or not to draw a border + :type border: bool + :value border: True + + :param color_pair: The color pair bound in curses config to use + :type color_pair: int + :value color_pair: 4 """ - self._pad = curses.newpad(1, 1) + # Set virtual dimensions + self.v_height = height + self.v_width = width + self.y = y + self.x = x + + # Set or create the parent window + self.parent = parent or curses.newwin(self.v_height, self.v_width) + self._pad = curses.newpad(self.v_height, self.v_width) self._pad.scrollok(True) - self.parent = curses.newwin(0, 0) - self.__border = border - self.v_width = 0 - self.v_height = 0 - # Pane state + # Set the draw dimensions + self.__reset_draw_dimensions() + + # Pane style options and state details + self.border = border self.selected = False + self._style = curses.color_pair(color_pair) self.needs_refresh = False self.scroll_position_y = 0 self.scroll_position_x = 0 - self.scroll_limit_y = 0 - self.scroll_limit_x = 0 - # Draw Style - self._style = curses.color_pair(color_pair) + @property + def scroll_limit_y(self: Pane) -> int: + return 0 + + @property + def scroll_limit_x(self: Pane) -> int: + return 0 @abstractmethod def draw(self: Pane) -> None: - """ - Abstract draw method, must be overwritten in child class - draw should first resize the pad using: self._pad.resize() + """Abstract draw method, must be overwritten in child class + draw should first resize the pad using: `super().resize(w, h)` then add content using: self._pad.addstr() - then refresh using: self._pad.refresh() + then refresh using: `super().refresh()` abstract method will clear and handle border child class should also set _scroll_limit_x and _scroll_limit_y here """ if self.needs_refresh: - self.clear() + self.refresh() self.parent.attron(self._style) self._pad.attron(self._style) - if self.__border: - self.parent.box() + if(self.border): + self._pad.box() - def clear(self: Pane) -> None: + def resize(self: Pane, height: int, width: int) -> None: + """Resize the virtual pad and change internal variables to reflect that + + :param height: New virtual height + :type height: int + + :param width: New virtual width + :type width: int """ - Clear all contents of pad and parent window + self.v_height = height + self.v_width = width + self.__reset_draw_dimensions() + self._pad.resize(self.v_height, self.v_width) + + def __reset_draw_dimensions(self: Pane) -> None: + p_height, p_width = self.parent.getmaxyx() + self.d_height = min(self.v_height, p_height - 1) + self.d_width = min(self.v_width, p_width - 1) + + def clear(self: Pane) -> None: + """Clear all contents of pad and parent window + + .. warning:: + + This should only be used if an event changing the entire pane + occurs. If used on every cycle, a flickering effect will occur, + due to the slowness of the operation. """ self._pad.clear() self.parent.clear() - self.needs_refresh = False + # self.refresh() - @abstractmethod - def add(self: Pane, item: any) -> None: + def clear_line(self: Pane, y: int, style: any = None) -> None: + """Clears a single line of the Pane + + :param y: The line to clear + :type y: int + + :param style: The background color to set when clearing the line + :type style: int """ - Abstract add method, must be overwritten in child class - Child class should handle retrieving and storing added items - to be drawn by add method + line_style = style or self._style + self._pad.move(y, 1) + # self._pad.addstr(y, 1, ' ' * (self.d_width - 2), curses.COLOR_BLUE) + self._pad.attron(line_style) + self._pad.clrtoeol() + self._pad.attroff(line_style) + + def refresh(self: Pane) -> None: + """Refresh the pane based on configured draw dimensions """ - ... + self._pad.refresh(self.scroll_position_y, + self.scroll_position_x, + self.y, + self.x, + self.y + self.d_height, + self.x + self.d_width) + self.needs_refresh = False def scroll_up(self: Pane, rate: int = 1) -> bool: - """ - Scroll pad upwards + """Scroll pad upwards + + .. note:: - Arguments - ---------- - rate: number of lines to scroll up by + Scroll limit must be set by child class - Returns - -------- - bool: Indication of whether a limit was reached. False indicates a - limit was reached and the pane cannot be scrolled further in that - direction + :param rate: Number of lines to scroll by + :type rate: int + + :return: Indication of whether a limit was reached. False indicates a + limit was reached and the pane cannot be scrolled further in that + direction + :rtype: bool """ self.scroll_position_y -= rate if self.scroll_position_y < 0: @@ -96,20 +170,20 @@ def scroll_up(self: Pane, rate: int = 1) -> bool: return False return True - def scroll_down(self: Pane, rate: int = 1) -> None: - """ - Scroll pad downwards + def scroll_down(self: Pane, rate: int = 1) -> bool: + """Scroll pad downwards - Note: scroll limit must be handled by child class + .. note:: - Arguments - ---------- - rate: number of lines to scroll down by + Scroll limit must be set by child class - Returns - -------- - bool: Indication of whether a limit was reached. False indicates a - limit was reached and the pane cannot be scrolled further in that + :param rate: Number of lines to scroll by + :type rate: int + + :return: Indication of whether a limit was reached. False indicates a + limit was reached and the pane cannot be scrolled further in that + direction + :rtype: bool """ self.scroll_position_y += rate if self.scroll_position_y > self.scroll_limit_y: @@ -117,39 +191,41 @@ def scroll_down(self: Pane, rate: int = 1) -> None: return False return True - def scroll_left(self: Pane, rate: int = 1): - """ - Scroll pad left + def scroll_left(self: Pane, rate: int = 1) -> bool: + """Scroll pad left - Arguments - ---------- - rate: number of cols to scroll left by + .. note:: - Returns - -------- - bool: Indication of whether a limit was reached. False indicates a - limit was reached and the pane cannot be scrolled further in that + Scroll limit must be set by child class + + :param rate: Number of lines to scroll by + :type rate: int + + :return: Indication of whether a limit was reached. False indicates a + limit was reached and the pane cannot be scrolled further in that + direction + :rtype: bool """ self.scroll_position_x -= rate - if self.scroll_position_x < 0: + if(self.scroll_position_x < 0): self.scroll_position_x = 0 return False return True - def scroll_right(self: Pane, rate: int = 1): - """ - Scroll pad right + def scroll_right(self: Pane, rate: int = 1) -> bool: + """Scroll pad right - Note: scroll limit must be handled by child class + .. note:: - Arguments - ---------- - rate: number of cols to scroll right by + Scroll limit must be set by child class - Returns - -------- - bool: Indication of whether a limit was reached. False indicates a - limit was reached and the pane cannot be scrolled further in that + :param rate: Number of lines to scroll by + :type rate: int + + :return: Indication of whether a limit was reached. False indicates a + limit was reached and the pane cannot be scrolled further in that + direction + :rtype: bool """ self.scroll_position_x += rate if self.scroll_position_x > self.scroll_limit_x: @@ -157,205 +233,55 @@ def scroll_right(self: Pane, rate: int = 1): return False return True + def add_line(self: Pane, + y: int, + x: int, + line: str, + bold: bool = False, + underline: bool = False, + highlight: bool = False, + color: any = None) -> None: + """Adds a line of text to the Pane and if needed, it handles the + process of resizing the embedded pad -class CANMsgPane(Pane): - """ - Displays a Table of CAN messages inside of a pane - """ + :param y: Line's row position + :type y: int - def __init__(self: CANMsgPane, name: str, parser: CANOpenParser, - capacity: int = None, fields: dict = None, frame_types: list - = None): - """ - CANMsgPane Initialization - - Arguments - ---------- - name: string title displayed on parent window - parser: CANOpenParser used to generated parsed messages - capacity: Maximum number of records to display in pane - fields: ordered dictionary of fields to display in output (layout.json) - frame_types: list of frame types to display in table (layout.json) - """ - super().__init__() - if fields is None: - fields = {} - - if frame_types is None: - frame_types = [] - - self._name = name - self._parser = parser - self._cols = {} - self.table = CANMsgTable(capacity=capacity) - self.__top = 0 - - # set width to column + 2 padding for each field - for field in fields: - self._cols[field] = [fields[field], len(field) + 2] - self.v_width += self._cols[field][1] - - # Turn the frame-type strings into enumerations - self.frame_types = [] - for ft in frame_types: - self.frame_types.append(MessageType[ft]) - - def draw(self: CANMsgPane) -> None: - """ - Draw all records from the CANMsgTable to the Pane's Pad - and any relevent information to the Pane's parent window - Pane scrolling and refreshing are implemented here as well - """ - super().draw() - height, width = self.parent.getmaxyx() - y_offset, x_offset = self.parent.getbegyx() - - self.v_height = len(self.table) + 50 - self.v_height = height if self.v_height < height else self.v_height - self.scroll_limit_y = len(self.table) - 1 - - self.v_width = width if self.v_width < width else self.v_width - self.scroll_limit_x = self.v_width - 2 - - self._pad.resize(self.v_height - 1, self.v_width) - - # Update parent - out_of = '/{}'.format(self.table.capacity) \ - if self.table.capacity is not None else '' - banner = '{} ({}{})'.format(self._name, - len(self.table), - out_of) - - if self.selected: - self.parent.attron(self._style | curses.A_REVERSE) - - self.parent.addstr(0, 1, banner) - - self.parent.attroff(self._style | curses.A_REVERSE) - - # Add fields header or directions to add fields - if len(self._cols) == 0: - if self.v_height < 2: - self.v_height = 2 - - self.__add_line(1, 1, "No fields added for this pane!", bold=True) - self.__add_line(2, 1, "Add fields in " - "~/.config/canopen-monitor/layout.json", - bold=True) - - self._pad.attroff(self._style) - - for i, arb_id in enumerate(self.table): - msg = self.table[arb_id] - attributes = dir(msg) - line = "" - for col in self._cols.values(): - if col[0] in attributes: - if col[0] == 'arb_id': - value = hex(msg.arb_id) - else: - value = str(getattr(msg, col[0])) - else: - value = "Not Found" - - line += value.ljust(col[1], ' ') - - is_selected = self.selected and self.scroll_position_y == i - self.__add_line(i + 1, 1, line, selected=is_selected) - - # Don't Scroll down until after scrolling past last item - if self.scroll_position_y - (height - 4) > self.__top: - self.__top = self.scroll_position_y - (height - 4) - if self.scroll_position_y < self.__top: - self.__top = self.scroll_position_y - - # Don't allow for for pad to be seen past v_width - if self.scroll_position_x + width > self.v_width: - self.scroll_position_x = self.v_width - width - - scroll_offset_x = self.scroll_position_x - self.__draw_header(self.__top, 1) - - self.parent.refresh() - self._pad.refresh(self.__top, - scroll_offset_x, - y_offset + 1, - x_offset + 1, - y_offset + height - 2, - x_offset + width - 2) - - def __add_line(self: CANMsgPane, y: int, x: int, line: str = "", - bold: bool = False, selected: bool = False) -> None: - """ - Add line of text to the Pane's pad - Handles resizing and pad attributes - - Arguments - ---------- - y: Pad's row to start writing to - x: Pad's col to start writing to - line: string of text to write - bold: boolean indicator of whether to bold line - selected: boolean indicator of whether to mark line selected - """ - # Widen pad when line length is larger than current v_width - if len(line) + 2 > self.v_width: - self.v_width = len(line) + 2 - self._pad.resize(self.v_height - 1, self.v_width) + :param x: Line's collumn position + :type x: int - if bold: - self._pad.attron(self._style | curses.A_BOLD) - if selected: - self._pad.attron(self._style | curses.A_REVERSE) + :param line: Text to write to the Pane + :type line: str - self._pad.addstr(y, x, line) + :param bold: A style option to bold the line written + :type bold: bool - if bold: - self._pad.attroff(self._style | curses.A_BOLD) - if selected: - self._pad.attroff(self._style | curses.A_REVERSE) + :param highlight: A syle option to highlight the line writte + :type highlight: bool - def __draw_header(self: CANMsgPane, y: int, x: int) -> None: - """ - Draw the table header to the top of the pane - """ - line = "" - for col in self._cols: - line += col.ljust(self._cols[col][1], ' ') - - self.__add_line(y, x, line, bold=True) - - def add(self: CANMsgPane, msg: CANMsg) -> None: - """ - Add provided message to the can msg table - and update column widths as required - - Arguments - ---------- - msg: CanMsg to be added - """ - super().add(msg) - msg.parsed_msg = self._parser.parse(msg)[0] - self.table += msg - - attributes = dir(msg) - for col in self._cols.values(): - if col[0] in attributes: - value = str(getattr(msg, col[0])) - else: - value = "Not Found" - - col[1] = len(value) + 2 if len(value) + 2 > col[1] else col[1] - - def has_frame_type(self: CANMsgPane, frame: CANMsg) -> bool: - """ - Determine if CANMsg type is handled by this frame - Arguments - ---------- - frame: CANMsg to check type of - - Returns - -------- - bool: Indicator if this pane accepts the message type of the provided message + :param style: A color option for the line + :type style: curses.style """ - return frame.message_type in self.frame_types + # Set the color option to the pane default if none was specified + line_style = color or self._style + + # Widen pad when necessary + new_width = len(line) + x + if(new_width > self.v_width): + self.resize(self.v_height, new_width) + + # Heighten the pad when necessary + if(y > self.v_height): + self.resize(y + 1, self.v_width) + + # Add style options + if(bold): + line_style |= curses.A_BOLD + if(highlight): + line_style |= curses.A_REVERSE + if(underline): + line_style |= curses.A_UNDERLINE + + # Add the line + if(y < self.d_height): + self._pad.addstr(y, x, line, line_style) diff --git a/canopen_monitor/ui/windows.py b/canopen_monitor/ui/windows.py index 8e931d6..6d6dc54 100755 --- a/canopen_monitor/ui/windows.py +++ b/canopen_monitor/ui/windows.py @@ -1,40 +1,92 @@ +from __future__ import annotations import curses +from .pane import Pane -class PopupWindow: - def __init__(self, parent, message, banner='fatal', color_pair=3): - height, width = parent.getmaxyx() - style = curses.color_pair(color_pair) | curses.A_REVERSE - any_key_message = "Press any key to continue..." - message = message.split('\n') - long = len(any_key_message) - - for m in message: - if(len(m) > long): - long = len(m) - if(long < len(banner)): - long = len(banner) - long += 1 - - window = curses.newwin(len(message) + 2, - long + 2, - int((height - len(message) + 2) / 2), - int((width - long + 2) / 2)) - window.attron(style) - for i, m in enumerate(message): - window.addstr(1 + i, 1, m.ljust(long, ' ')) - window.box() - window.addstr(0, 1, banner + ":", curses.A_UNDERLINE | style) - window.addstr(len(message) + 1, - long - len(any_key_message), - any_key_message) - - window.attroff(style) - - window.refresh() - parent.refresh() - - window.getch() - curses.flushinp() - window.clear() - parent.clear() +class PopupWindow(Pane): + def __init__(self: PopupWindow, + parent: any, + header: str = 'Alert', + content: [str] = [], + footer: str = 'ESC: close', + style: any = None): + super().__init__(parent=(parent or curses.newpad(0, 0)), + height=1, + width=1, + y=10, + x=10) + # Pop-up window properties + self.header = header + self.content = content + self.footer = footer + self.enabled = False + + # Parent window dimensions (Usually should be STDOUT directly) + p_height, p_width = self.parent.getmaxyx() + + # Break lines as necessary + self.content = self.break_lines(int(2 * p_width / 3), self.content) + + # UI dimensions + p_height, p_width = self.parent.getmaxyx() + self.v_height = (len(self.content)) + 2 + width = len(self.header) + 2 + if(len(self.content) > 0): + width = max(width, max(list(map(lambda x: len(x), self.content)))) + self.v_width = width + 4 + self.y = int(((p_height + self.v_height) / 2) - self.v_height) + self.x = int(((p_width + self.v_width) / 2) - self.v_width) + + # UI properties + self.style = (style or curses.color_pair(0)) + self._pad.attron(self.style) + + def break_lines(self: PopupWindow, + max_width: int, + content: [str]) -> [str]: + # Determine if some lines of content need to be broken up + for i, line in enumerate(content): + length = len(line) + mid = int(length / 2) + + if(length >= max_width): + # Break the line at the next available space + for j, c in enumerate(line[mid - 1:]): + if(c == ' '): + mid += j + break + + # Apply the line break to the content array + content.pop(i) + content.insert(i, line[:mid - 1]) + content.insert(i + 1, line[mid:]) + return content + + def toggle(self: PopupWindow) -> bool: + self.enabled = not self.enabled + return self.enabled + + def __draw_header(self: PopupWindow) -> None: + self.add_line(0, 1, self.header, underline=True) + + def __draw__footer(self: PopupWindow) -> None: + f_width = len(self.footer) + 2 + self.add_line(self.v_height - 1, + self.v_width - f_width, + self.footer, + underline=True) + + def draw(self: PopupWindow) -> None: + if(self.enabled): + super().resize(self.v_height, self.v_width) + super().draw() + self.__draw_header() + + for i, line in enumerate(self.content): + self.add_line(1 + i, 2, line) + + self.__draw__footer() + super().refresh() + else: + # super().clear() + ... diff --git a/canopen_monitor/utilities.py b/canopen_monitor/utilities.py deleted file mode 100755 index 68f20aa..0000000 --- a/canopen_monitor/utilities.py +++ /dev/null @@ -1,82 +0,0 @@ -import os.path -import json -import canopen_monitor - - -def generate_dirs(exist_ok: bool = True): - os.makedirs(canopen_monitor.CONFIG_DIR, exist_ok=exist_ok) - os.makedirs(canopen_monitor.CACHE_DIR, exist_ok=exist_ok) - os.makedirs(canopen_monitor.EDS_DIR, exist_ok=exist_ok) - - -def load_config(filename: str): - """Loads a pre-existing json file - - Returns - ----- - """ - file = open(os.path.expanduser(filename)) - raw_data = file.read() - file.close() - return json.loads(raw_data) - - -def config_factory(filepath: str): - '''Generate the default configs''' - if(filepath == canopen_monitor.DEVICES_CONFIG): - data = { - 'devices': ['can0'], - 'stale_timeout': 60, - 'dead_timeout': 120 - } - elif(filepath == canopen_monitor.NODES_CONFIG): - data = {0x40: "MDC"} - elif(filepath == canopen_monitor.LAYOUT_CONFIG): - data = { - 'type': 'grid', - 'split': 'horizontal', - 'data': [{ - 'type': 'grid', - 'split': 'vertical', - 'data': [{ - 'type': 'heartbeat_table', - 'capacity': None, - 'name': 'Hearbeats', - 'fields': [], - 'frame_types': ['HEARTBEAT'] - }, { - 'type': 'info_table', - 'capacity': None, - 'name': 'Info', - 'fields': [], - 'frame_types': [] - }] - }, { - 'type': 'misc_table', - 'capacity': None, - 'name': 'Misc', - 'fields': [], - 'frame_types': [ - 'NMT', - 'SYNC', - 'TIME', - 'EMER', - 'PDO1_TX', - 'PDO1_RX', - 'PDO2_TX', - 'PDO2_RX', - 'PDO3_TX', - 'PDO3_RX', - 'PDO4_TX', - 'PDO4_RX', - 'SDO_TX', - 'SDO_RX', - 'UKNOWN' - ] - }]} - else: - data = {} - - file = open(os.path.expanduser(filepath), 'w+') - file.write(json.dumps(data, sort_keys=True, indent=4) + '\n') - file.close() diff --git a/docs/canopen_monitor.canmsgs.rst b/docs/canopen_monitor.canmsgs.rst deleted file mode 100644 index 32efe28..0000000 --- a/docs/canopen_monitor.canmsgs.rst +++ /dev/null @@ -1,37 +0,0 @@ -canopen\_monitor.canmsgs package -================================ - -Submodules ----------- - -canopen\_monitor.canmsgs.canmsg module --------------------------------------- - -.. automodule:: canopen_monitor.canmsgs.canmsg - :members: - :undoc-members: - :show-inheritance: - -canopen\_monitor.canmsgs.canmsg\_table module ---------------------------------------------- - -.. automodule:: canopen_monitor.canmsgs.canmsg_table - :members: - :undoc-members: - :show-inheritance: - -canopen\_monitor.canmsgs.magic\_can\_bus module ------------------------------------------------ - -.. automodule:: canopen_monitor.canmsgs.magic_can_bus - :members: - :undoc-members: - :show-inheritance: - -Module contents ---------------- - -.. automodule:: canopen_monitor.canmsgs - :members: - :undoc-members: - :show-inheritance: diff --git a/docs/canopen_monitor.parser.rst b/docs/canopen_monitor.parser.rst deleted file mode 100644 index 8732c21..0000000 --- a/docs/canopen_monitor.parser.rst +++ /dev/null @@ -1,85 +0,0 @@ -canopen\_monitor.parser package -=============================== - -Submodules ----------- - -canopen\_monitor.parser.canopen module --------------------------------------- - -.. automodule:: canopen_monitor.parser.canopen - :members: - :undoc-members: - :show-inheritance: - -canopen\_monitor.parser.eds module ----------------------------------- - -.. automodule:: canopen_monitor.parser.eds - :members: - :undoc-members: - :show-inheritance: - -canopen\_monitor.parser.emcy module ------------------------------------ - -.. automodule:: canopen_monitor.parser.emcy - :members: - :undoc-members: - :show-inheritance: - -canopen\_monitor.parser.hb module ---------------------------------- - -.. automodule:: canopen_monitor.parser.hb - :members: - :undoc-members: - :show-inheritance: - -canopen\_monitor.parser.pdo module ----------------------------------- - -.. automodule:: canopen_monitor.parser.pdo - :members: - :undoc-members: - :show-inheritance: - -canopen\_monitor.parser.sdo module ----------------------------------- - -.. automodule:: canopen_monitor.parser.sdo - :members: - :undoc-members: - :show-inheritance: - -canopen\_monitor.parser.sync module ------------------------------------ - -.. automodule:: canopen_monitor.parser.sync - :members: - :undoc-members: - :show-inheritance: - -canopen\_monitor.parser.time module ------------------------------------ - -.. automodule:: canopen_monitor.parser.time - :members: - :undoc-members: - :show-inheritance: - -canopen\_monitor.parser.utilities module ----------------------------------------- - -.. automodule:: canopen_monitor.parser.utilities - :members: - :undoc-members: - :show-inheritance: - -Module contents ---------------- - -.. automodule:: canopen_monitor.parser - :members: - :undoc-members: - :show-inheritance: diff --git a/docs/canopen_monitor.rst b/docs/canopen_monitor.rst deleted file mode 100644 index 6608228..0000000 --- a/docs/canopen_monitor.rst +++ /dev/null @@ -1,39 +0,0 @@ -canopen\_monitor package -======================== - -Subpackages ------------ - -.. toctree:: - :maxdepth: 4 - - canopen_monitor.canmsgs - canopen_monitor.parser - canopen_monitor.ui - -Submodules ----------- - -canopen\_monitor.monitor\_app module ------------------------------------- - -.. automodule:: canopen_monitor.monitor_app - :members: - :undoc-members: - :show-inheritance: - -canopen\_monitor.utilities module ---------------------------------- - -.. automodule:: canopen_monitor.utilities - :members: - :undoc-members: - :show-inheritance: - -Module contents ---------------- - -.. automodule:: canopen_monitor - :members: - :undoc-members: - :show-inheritance: diff --git a/docs/canopen_monitor.ui.rst b/docs/canopen_monitor.ui.rst deleted file mode 100644 index 4a2a3cb..0000000 --- a/docs/canopen_monitor.ui.rst +++ /dev/null @@ -1,37 +0,0 @@ -canopen\_monitor.ui package -=========================== - -Submodules ----------- - -canopen\_monitor.ui.grid module -------------------------------- - -.. automodule:: canopen_monitor.ui.grid - :members: - :undoc-members: - :show-inheritance: - -canopen\_monitor.ui.pane module -------------------------------- - -.. automodule:: canopen_monitor.ui.pane - :members: - :undoc-members: - :show-inheritance: - -canopen\_monitor.ui.windows module ----------------------------------- - -.. automodule:: canopen_monitor.ui.windows - :members: - :undoc-members: - :show-inheritance: - -Module contents ---------------- - -.. automodule:: canopen_monitor.ui - :members: - :undoc-members: - :show-inheritance: diff --git a/docs/conf.py b/docs/conf.py index c02bf82..558a11f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -18,7 +18,7 @@ # -- Project information ----------------------------------------------------- project = 'CANOpen Monitor' -copyright = '2020, Portland State Aerospace Society' +copyright = '2021, Portland State Aerospace Society' author = 'Portland State Aerospace Society' @@ -30,7 +30,8 @@ extensions = [ 'sphinx.ext.todo', 'sphinx.ext.viewcode', - 'sphinx.ext.autodoc' + 'sphinx.ext.autodoc', + 'sphinx.ext.autosectionlabel' ] # Add any paths that contain templates here, relative to this directory. @@ -52,4 +53,4 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] +html_static_path = [] diff --git a/docs/development/can.rst b/docs/development/can.rst new file mode 100644 index 0000000..9438da3 --- /dev/null +++ b/docs/development/can.rst @@ -0,0 +1,7 @@ +CAN Module +========== + +.. automodule:: canopen_monitor.can + :members: + :private-members: + :undoc-members: diff --git a/docs/development/index.rst b/docs/development/index.rst new file mode 100644 index 0000000..435b4e1 --- /dev/null +++ b/docs/development/index.rst @@ -0,0 +1,47 @@ +Development Reference +===================== + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + + can + parse + ui + +Node Ranges to Types Map +======================== + ++----------------+--------+---------+ +|Type |Range In|Range Out| ++----------------+--------+---------+ +|NMT node control|0 | | ++----------------+--------+---------+ +|SYNC |0x080 | | ++----------------+--------+---------+ +|Emergency |0x80 |0x100 | ++----------------+--------+---------+ +|Time Stamp |100 | | ++----------------+--------+---------+ +|PDO1 tx |0x180 |0x200 | ++----------------+--------+---------+ +|PDO1 rx |0x200 |0x280 | ++----------------+--------+---------+ +|PDO2 tx |0x280 |0x300 | ++----------------+--------+---------+ +|PDO2 rx |0x300 |0x380 | ++----------------+--------+---------+ +|PDO3 tx |0x380 |0x400 | ++----------------+--------+---------+ +|PDO3 rx |0x400 |0x480 | ++----------------+--------+---------+ +|PDO4 tx |0x480 |0x500 | ++----------------+--------+---------+ +|PDO4 rx |0x500 |0x580 | ++----------------+--------+---------+ +|SDO tx |0x580 |0x600 | ++----------------+--------+---------+ +|SDO rx |0x600 |0x680 | ++----------------+--------+---------+ +|Heartbeats |0x700 |0x7FF | ++----------------+--------+---------+ diff --git a/docs/development/parse.rst b/docs/development/parse.rst new file mode 100644 index 0000000..506f407 --- /dev/null +++ b/docs/development/parse.rst @@ -0,0 +1,7 @@ +Parse Module +============ + +.. automodule:: canopen_monitor.parse + :members: + :private-members: + :undoc-members: diff --git a/docs/development/ui.rst b/docs/development/ui.rst new file mode 100644 index 0000000..c6a65f8 --- /dev/null +++ b/docs/development/ui.rst @@ -0,0 +1,7 @@ +UI Module +========= + +.. automodule:: canopen_monitor.ui + :members: + :private-members: + :undoc-members: diff --git a/docs/glossary.rst b/docs/glossary.rst new file mode 100644 index 0000000..4352a25 --- /dev/null +++ b/docs/glossary.rst @@ -0,0 +1,87 @@ +======== +Glossary +======== + +.. glossary:: + :sorted: + + Baud Rate + The speed of messages sent over a communications bus. It is not + directly linked but typically infers the interval that messages are + sent or resent in a protocol. + + C3 + Command, communication, and control board. See + https://github.com/oresat/oresat-c3 + + CAN + Control area network. A message bus for embedded systems. + + CAN ID + CAN Identifier. This is the 11-bit CAN message identifier which is at + the beginning of every CAN message on the bus. + + CANopen + A communication protocol and device profile specification for a CAN + bus defined by CAN in Automation. More info at https://can-cia.org/ + + CFC + Cirrus Flux Camera. One of OreSat1 payloads and a Linux board. + + COB ID + Communication object identifier. + + CubeSat + A CubeSat is small satellite is made up of multiples of 10cm × 10cm × + 10cm cubic units + + Daemon + A long running process on Linux, which runs in the background. + + DLC + Data Length Code. The operational code dictating the size of the data + frame. + + EDS + Electronic Data Sheet. This is an INI style or XML style formatted file. + + NCurses + New Curses. An application programming interface for manipulating the + standard terminal. Used for making terminal-based applications without + the need for a GUI. + + MTU + Maximum Transmission Unit. The maximum size of a packet. In context of + this application, the MTU of a CAN packet is 108 bytes for a + maximum-data-frame size of 64 bits (8 bytes). + + OreSat + PSAS's open source CubeSat project. See their + `homepage `_ for more details. + + OreSat0 + A 1U cube-satellite made and maintained by OreSat. + + OreSat1: + A 2U cube-satellite made and maintained by OreSat. + + OLM + OreSat Linux Manager. The front end daemon for all OreSat Linux boards. + It converts CANopen message into DBus messages and vice versa. See + https://github.com/oresat/oresat-linux-manager + + PDO + Process Data Object. Inputs and outputs. Values of type rotational + speed, voltage, frequency, electric current, etc. + + PSAS + Portland State Aerosapce Society. A student aerospace group at + Portland State University. See https://www.pdxaerospace.org/ + + SDO + Service Data Object. Configuration settings, possibly node ID, baud + rate, offset, gain, etc. + + SDR + Software Define Radio. Radio communications that are traditionally + implemented in hardware are instead implemented in software. diff --git a/docs/index.rst b/docs/index.rst index 32a0271..f1b0ae6 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -1,20 +1,33 @@ -.. CANOpen Monitor documentation master file, created by - sphinx-quickstart on Tue Dec 1 15:57:30 2020. - You can adapt this file completely to your liking, but it should at least - contain the root `toctree` directive. - +=========================================== Welcome to CANOpen Monitor's documentation! =========================================== +CANOpen-Monitor is an NCurses-based TUI application for tracking activity +over the CAN bus and decoding messages with provided EDS/OD files. + +.. warning:: + This is still a work in progress. + .. toctree:: :maxdepth: 2 :caption: Contents: - modules + development/index -Indices and tables -================== +Glossary and Terms +------------------ + +.. toctree:: + :maxdepth: 2 + + glossary + +Index +===== * :ref:`genindex` * :ref:`modindex` * :ref:`search` + +.. _OreSat Website: https://www.oresat.org/ +.. _OreSat GitHub: https://github.com/oresat diff --git a/docs/modules.rst b/docs/modules.rst deleted file mode 100644 index eea4fd9..0000000 --- a/docs/modules.rst +++ /dev/null @@ -1,7 +0,0 @@ -canopen_monitor -=============== - -.. toctree:: - :maxdepth: 4 - - canopen_monitor diff --git a/docs/node_id_ranges.txt b/docs/node_id_ranges.txt deleted file mode 100644 index 1e49843..0000000 --- a/docs/node_id_ranges.txt +++ /dev/null @@ -1,15 +0,0 @@ -0: NMT node control -0x080: SYNC -0x80 - 0x100: Emergency -100: Time Stamp -0x180 - 0x200: PDO1 tx -0x200 - 0x280: PDO1 rx -0x280 - 0x300: PDO2 tx -0x300 - 0x380: PDO2 rx -0x380 - 0x400: PDO3 tx -0x400 - 0x480: PDO3 rx -0x480 - 0x500: PDO4 tx -0x500 - 0x580: PDO4 rx -0x580 - 0x600: SDO tx -0x600 - 0x680: SDO rx -0x700 - 0x7FF: Heartbeats diff --git a/scripts/dev_bus_setup.sh b/scripts/dev_bus_setup.sh deleted file mode 100755 index ce7fe5d..0000000 --- a/scripts/dev_bus_setup.sh +++ /dev/null @@ -1,20 +0,0 @@ -#!/usr/bin/env bash - -BUS_LIST_PATH=~/.config/canopen-monitor/devices.json -BUS_LIST=$(cat $BUS_LIST_PATH | cut -d'[' -f1 | cut -d']' -f1 | sed 's/,//g' | xargs) - -for bus in ${BUS_LIST[@]}; do - sudo ip link add $bus type vcan > /dev/null 2>&1 - if [[ $? == 0 ]]; then - echo -e "Created bus: $bus" - else - echo -e "Failed to create bus: $bus" - fi - - sudo ip link set $bus up > /dev/null 2>&1 - if [[ $? == 0 ]]; then - echo -e "Set bus status to UP" - else - echo -e "Failed to set bus status to UP" - fi -done diff --git a/scripts/dev_cangen.sh b/scripts/dev_cangen.sh deleted file mode 100755 index 936a536..0000000 --- a/scripts/dev_cangen.sh +++ /dev/null @@ -1,29 +0,0 @@ -#!/usr/bin/env bash - -BUS_LIST_PATH=~/.config/canopen-monitor/devices.json -BUS_LIST=$(cat $BUS_LIST_PATH | cut -d'[' -f1 | cut -d']' -f1 | sed 's/,//g' | xargs) -PID_LIST=() - -for bus in ${BUS_LIST[@]}; do - cangen $bus & - PID_LIST+=($!) -done - -echo -e "Spawned test device generators: (${BUS_LIST[@]}):(${PID_LIST[@]})" - -while [[ 1 ]]; do - read -p '> ' input - input=$(echo $input | xargs) - - if [[ $input == "q" ]] || [[ $input == 'quit' ]]; then - for pid in ${PID_LIST[@]}; do - kill -9 $pid - done - - exit 0 - elif [[ $input =~ 'kill' ]]; then - ID=$(echo $input | cut -d' ' -f2 | xargs) - echo "Killing PID $ID..." - kill -9 $ID - fi -done diff --git a/scripts/socketcan-dev.py b/scripts/socketcan-dev.py new file mode 100644 index 0000000..3d2000d --- /dev/null +++ b/scripts/socketcan-dev.py @@ -0,0 +1,162 @@ +#!/usr/bin/env python3 +import can +import time +import random +import argparse +import subprocess + +_FAILURE_STRING = 'Device "{}" does not exist.' +_BUS_TYPE = 'socketcan' + + +def create_vdev(name: str) -> bool: + rc_create = subprocess.call(['sudo', 'ip', 'link', 'add', + 'dev', name, 'type', 'vcan']) + created = rc_create == 0 or rc_create == 2 + + if(created): + rc_netup = subprocess.call(['sudo', 'ip', 'link', 'set', name, 'up']) + netup = rc_netup == 0 or rc_netup == 2 + else: + netup = False + + return created and netup + + +def destroy_vdev(name: str) -> bool: + rc_destroy = subprocess.call(['sudo', 'ip', 'link', 'del', 'dev', name]) + destroyed = rc_destroy == 0 or rc_destroy == 1 + return destroyed + + +def send(channel: str, id: int, message: [int]): + """:param id: Spam the bus with messages including the data id.""" + bus = can.interface.Bus(channel=channel, bustype=_BUS_TYPE) + msg = can.Message(arbitration_id=id, + data=message, + is_extended_id=False) + bus.send(msg) + + +def send_handle(args: dict, up: [str]): + for i, c in enumerate(args.channels): + if(up[i]): + id = args.id + i + send(c, id, args.message) + msg_str = ' '.join(list(map(lambda x: hex(x).upper()[2:] + .rjust(2, '0'), + args.message))) + print(f'[{time.ctime()}]:'.ljust(30, ' ') + + f'{c}'.ljust(8, ' ') + + f'{hex(id)}'.ljust(10, ' ') + + f'{msg_str}'.ljust(25, ' ')) + + +def send_handle_cycle(args: [any], up: [any]) -> None: + # Regenerate the random things if flagged to do so + if(args.random_id): + args.id = random.randint(0x0, 0x7ff) + if(args.random_message): + args.message = [random.randint(0, 255) for _ in range(8)] + + # Send the things + send_handle(args, up) + time.sleep(args.delay) + + +def main(): + parser = argparse.ArgumentParser(prog='socketcan-dev', + description='A simple SocketCan wrapper' + ' for testing' + ' canopen-monitor', + allow_abbrev=False) + parser.add_argument('-c', '--channels', + type=str, + nargs="+", + default=['vcan0'], + help='The channel to create and send CAN messages on') + parser.add_argument('-d', '--delay', + type=float, + default=1, + help='Adjust the message-send delay time, used in' + ' conjunction with `-r`') + parser.add_argument('-i', '--id', + type=str, + default='10', + help='The COB ID to use for the messages') + parser.add_argument('-n', '--no-destroy', + dest='destroy', + action='store_false', + default=True, + help='Stop socketcan-dev from destroying the channel' + ' at the end of life') + parser.add_argument('-m', '--message', + type=str, + nargs=8, + default=['0', '0', '0', '1', '3', '1', '4', '1'], + help='The 7 bytes to send as the CAN message') + parser.add_argument('-r', '--repeat', + dest='repeat', + nargs='?', + type=int, + const=-1, + default=None, + help='Repeat sending the message N times, every so' + ' often defined by -d, used in conjunction with' + ' `-d`') + parser.add_argument('--random-id', + dest='random_id', + action='store_true', + default=False, + help='Use a randomly generated ID (this disables -i)') + parser.add_argument('--random-message', + dest='random_message', + action='store_true', + default=False, + help='Use a randomly generated message (this disables' + ' -m)') + args = parser.parse_args() + + # Interpret ID as hex + if(args.random_id): + args.id = random.randint(0x0, 0x7ff) + else: + args.id = int(args.id, 16) + + # Interpret message as hex + if(args.random_message): + args.message = [random.randint(0, 255) for _ in range(8)] + else: + args.message = list(map(lambda x: int(x, 16), args.message)) + + try: + up = [] + # Create the channels + for c in args.channels: + up.append(create_vdev(c)) + + # Quick-n-dirty banner + print('Timestamp:'.ljust(30, ' ') + + 'Channel'.ljust(8, ' ') + + 'COB ID'.ljust(10, ' ') + + 'Message'.ljust(25, ' ')) + print(''.ljust(73, '-')) + + # Send repeatedly in instructed to do so + if(args.repeat and args.repeat > 0): + for _ in range(args.repeat): + send_handle_cycle(args, up) + else: + while(args.repeat == -1): + send_handle_cycle(args, up) + + except KeyboardInterrupt: + print('Goodbye!') + finally: + if(args.destroy): + for channel in args.channels: + destroy_vdev(channel) + + +if __name__ == '__main__': + main() diff --git a/setup.py b/setup.py index 0fcd505..344bcbd 100755 --- a/setup.py +++ b/setup.py @@ -8,11 +8,18 @@ name=cm.APP_NAME, version=cm.APP_VERSION, author=cm.APP_AUTHOR, + maintainer=cm.MAINTAINER_NAME, + maintainer_email=cm.MAINTAINER_EMAIL, license=cm.APP_LICENSE, description=cm.APP_DESCRIPTION, long_description=long_description, long_description_content_type='text/markdown', url=cm.APP_URL, + project_urls={ + 'Documentation': 'https://canopen-monitor.readthedocs.io', + 'Bug Tracking': 'https://github.com/oresat/CANopen-monitor/issues?q=is' + '%3Aopen+is%3Aissue+label%3Abug' + }, packages=setuptools.find_packages(), classifiers=[ "Programming Language :: Python :: 3", @@ -25,26 +32,25 @@ "Topic :: System :: Networking :: Monitoring :: Hardware Watchdog" ], install_requires=[ - "pyvit", - "python-dateutil", - "ConfigArgParse", - "canopen" + "pyvit >= 0.2.1", + "psutil >= 5.8.0", + "python-dateutil >= 2.8.1" ], extras_require={ "dev": [ + "python-can", "setuptools", "wheel", - "pytest", "flake8", "twine", "sphinx", "sphinx_rtd_theme", ] }, - python_requires='>=3.8.5', + python_requires='>=3.9.0', entry_points={ "console_scripts": [ - '{} = canopen_monitor.__main__:main'.format(cm.APP_NAME), + f'{cm.APP_NAME} = canopen_monitor.__main__:main' ] } ) diff --git a/tests/can_monitor_spec.py b/tests/can_monitor_spec.py deleted file mode 100755 index e69de29..0000000 diff --git a/tests/canopen_parser_spec.py b/tests/canopen_parser_spec.py deleted file mode 100755 index e69de29..0000000 diff --git a/tests/magic_can_bus_spec.py b/tests/magic_can_bus_spec.py deleted file mode 100755 index 81d96c3..0000000 --- a/tests/magic_can_bus_spec.py +++ /dev/null @@ -1,33 +0,0 @@ -import unittest -import canopen_monitor.canmsgs.magic_can_bus as mcb - - -class TestMagicCanBus(unittest.TestCase): - def setUp(self): - """ - Setup the CAN Bus listners - """ - self.bus = mcb.MagicCANBus() - - def tearDown(self): - """ - Ensure the CAN Bus listeners are shut down - """ - self.bus.stop_all() - - def test_start_device_good(self): - """ - Test that `listen()` correctly creates a listener for `vcan0` - """ - self.skipTest('Revisit upon figuring out mock sockets.') - dev = 'vcan0' - self.bus.start(dev) - dev_names = list(map(lambda x: x.ndev, self.bus.running())) - self.assertIn(dev, dev_names) - - def test_receive_is_empty_no_bus(self): - """ - Test that the Magic Can Bus returns None when no bus is initialized and no messages exist on the CAN bus. - """ - res = self.bus.receive() - self.assertIsNone(res) diff --git a/tests/monitor_app_spec.py b/tests/monitor_app_spec.py deleted file mode 100755 index d553b86..0000000 --- a/tests/monitor_app_spec.py +++ /dev/null @@ -1,23 +0,0 @@ -import unittest -# import canopen_monitor -# import canopen_monitor.monitor_app as monitor_app - - -class TestMonitorApp(unittest.TestCase): - def setUp(self): - """ - Setup Monitor App. - """ - pass - - def tearDown(self): - """ - Tear down Monitor App. - """ - pass - - def test_todo_write_monitor_app_tests(self): - """ - Write Monitor App unit tests! - """ - self.skipTest("Write Monitor App unit tests some time later.") diff --git a/tests/eds_parser_spec.py b/tests/spec_eds_parser.py old mode 100755 new mode 100644 similarity index 83% rename from tests/eds_parser_spec.py rename to tests/spec_eds_parser.py index 8f14d6a..6d9ccd0 --- a/tests/eds_parser_spec.py +++ b/tests/spec_eds_parser.py @@ -1,14 +1,12 @@ import unittest -import canopen_monitor.parser.eds as eds +import canopen_monitor.parse.eds as eds from unittest.mock import mock_open, patch - -from canopen_monitor import ASSETS_DIR from tests import TEST_EDS class TestEDS(unittest.TestCase): def setUp(self): - with patch('builtins.open', mock_open(read_data=TEST_EDS)) as m: + with patch('builtins.open', mock_open(read_data=TEST_EDS)) as _: self.eds = eds.load_eds_file("star_tracker_OD.eds") def test_parse_index(self): @@ -69,16 +67,3 @@ def test_named_sections(self): self.assertEqual("3", self.eds.mandatory_objects.supported_objects, "Error parsing Comments named section") - - def test_real_files(self): - """ - Integration test against real files - """ - self.eds = eds.load_eds_file(ASSETS_DIR + "CFC_OD.eds") - self.eds = eds.load_eds_file(ASSETS_DIR + "GPS_OD.eds") - self.eds = eds.load_eds_file(ASSETS_DIR + "live_OD.eds") - self.eds = eds.load_eds_file(ASSETS_DIR + "star_tracker_OD.eds") - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/emcy_parser_spec.py b/tests/spec_emcy_parser.py old mode 100755 new mode 100644 similarity index 80% rename from tests/emcy_parser_spec.py rename to tests/spec_emcy_parser.py index 4e60757..f2b6376 --- a/tests/emcy_parser_spec.py +++ b/tests/spec_emcy_parser.py @@ -1,6 +1,6 @@ import unittest -from canopen_monitor.parser.emcy import parse -from canopen_monitor.parser.utilities import FailedValidationError +from canopen_monitor.parse.emcy import parse +from canopen_monitor.parse.utilities import FailedValidationError class TestEMCY(unittest.TestCase): @@ -35,9 +35,4 @@ def test_EMCY_invalid_length(self): with self.assertRaises(FailedValidationError) as context: parse(0, emcy_message, 0) - self.assertEqual("Invalid EMCY message length", - str(context.exception)) - - -if __name__ == '__main__': - unittest.main() + self.assertEqual("Invalid EMCY message length", str(context.exception)) diff --git a/tests/hb_parser_spec.py b/tests/spec_hb_parser.py similarity index 80% rename from tests/hb_parser_spec.py rename to tests/spec_hb_parser.py index 88db5df..0e25fe7 100644 --- a/tests/hb_parser_spec.py +++ b/tests/spec_hb_parser.py @@ -1,9 +1,9 @@ import unittest from unittest.mock import mock_open, patch -from canopen_monitor.parser import eds -from canopen_monitor.parser.hb import parse -from canopen_monitor.parser.utilities import FailedValidationError +from canopen_monitor.parse import eds +from canopen_monitor.parse.hb import parse +from canopen_monitor.parse.utilities import FailedValidationError from tests import TEST_EDS @@ -44,9 +44,4 @@ def test_HB_Empty(self): with self.assertRaises(FailedValidationError) as context: parse(123, hb_message, self.eds) - self.assertEqual("Invalid heartbeat state detected", - str(context.exception)) - - -if __name__ == '__main__': - unittest.main() + self.assertEqual("Invalid heartbeat state detected", str(context.exception)) diff --git a/tests/spec_interface.py b/tests/spec_interface.py new file mode 100644 index 0000000..98ae780 --- /dev/null +++ b/tests/spec_interface.py @@ -0,0 +1,33 @@ +import unittest +from canopen_monitor import can +from unittest.mock import MagicMock, patch + + +class Interface_Spec(unittest.TestCase): + """Tests for the Interface serial object""" + + @patch('psutil.net_if_stats') + def setUp(self, net_if_stats): + # Override the net_if_stats function from psutil + net_if_stats.return_value = {'vcan0': {}} + + # Create a fake socket + socket = MagicMock() + socket.start.return_value = None + + # Create Interface + self.iface = can.Interface('vcan0') + self.iface.socket.close() + self.iface.socket = socket + + @unittest.skip('Cannot patch psutil module') + def test_active_loop(self): + """Given a fake socket and an Interface + When binding to the socket via a `with` block + Then the socket should bind and the interface should change to the + `UP` state and then shoud move to the `DOWN` state when exiting the + `with` block + """ + with self.iface as iface: + self.assertTrue(iface.is_up) + self.assertFalse(iface.is_up) diff --git a/tests/spec_magic_can_bus.py b/tests/spec_magic_can_bus.py new file mode 100644 index 0000000..a828487 --- /dev/null +++ b/tests/spec_magic_can_bus.py @@ -0,0 +1,62 @@ +import unittest +import threading +from canopen_monitor import can +from unittest.mock import MagicMock + + +class MagicCanBus_Spec(unittest.TestCase): + """Tests for the Magic Can Bus""" + + def setUp(self): + # Fake CAN frame + generic_frame = MagicMock() + + # Create fake interfaces + if0 = MagicMock() + if0.name = 'vcan0' + if0.is_up = True + if0.recv.return_value = generic_frame + if0.__str__.return_value = 'vcan0' + + if1 = MagicMock() + if1.name = 'vcan1' + if1.is_up = False + if1.recv.return_value = generic_frame + if1.__str__.return_value = 'vcan1' + + # Setup the bus with no interfaces and then overide with the fakes + self.bus = can.MagicCANBus([]) + self.bus.interfaces = [if0, if1] + + def test_statuses(self): + """Given an MCB with 2 fake interfaces + When calling the statuses proprty + Then, the correct array of formatted tuples should be returned + """ + statuses = self.bus.statuses + self.assertEqual(statuses, [('vcan0', True), ('vcan1', False)]) + + def test_handler(self): + """Given an MCB with 2 interfaces + When starting the bus listeners with a `with` block + And calling the bus as an itterable + Then the bus should start a separate thread and fill the queue with + frames while the bus is open and then close the threads when the bus is + closed + """ + with self.bus as bus: + for frame in bus: + self.assertIsNotNone(frame) + # Active threads should only be 1 by the end, 1 being the parent + self.assertEqual(threading.active_count(), 1) + + def test_str(self): + """Given an MCB with 2 interfaces + When calling repr() on the bus + Then it should return a correctly formated string representation + of the bus + """ + expected = 'Magic Can Bus: vcan0, vcan1, pending messages:' \ + ' 0 threads: 0, keep-alive: True' + actual = str(self.bus) + self.assertEqual(expected, actual) diff --git a/tests/pdo_parser_spec.py b/tests/spec_pdo_parser.py old mode 100755 new mode 100644 similarity index 94% rename from tests/pdo_parser_spec.py rename to tests/spec_pdo_parser.py index a869a06..4b6839e --- a/tests/pdo_parser_spec.py +++ b/tests/spec_pdo_parser.py @@ -1,8 +1,8 @@ import unittest from unittest.mock import patch, mock_open -from canopen_monitor.parser import eds -from canopen_monitor.parser.pdo import parse +from canopen_monitor.parse import eds +from canopen_monitor.parse.pdo import parse from tests import TEST_EDS @@ -62,7 +62,3 @@ def test_mpdo_with_SAM(self): self.assertEqual("Orientation orientation - 1.0", parse(0x380, pdo_message, self.eds_data), "Error on MPDO SAM Message parse") - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/sdo_parser_spec.py b/tests/spec_sdo_parser.py old mode 100755 new mode 100644 similarity index 98% rename from tests/sdo_parser_spec.py rename to tests/spec_sdo_parser.py index 23d2edc..0606fcf --- a/tests/sdo_parser_spec.py +++ b/tests/spec_sdo_parser.py @@ -1,10 +1,9 @@ import unittest -from unittest.mock import MagicMock, patch, mock_open +from unittest.mock import patch, mock_open -from canopen_monitor import ASSETS_DIR -from canopen_monitor.parser import eds -from canopen_monitor.parser.sdo import SDOParser -from canopen_monitor.parser.utilities import FailedValidationError +from canopen_monitor.parse import eds +from canopen_monitor.parse.sdo import SDOParser +from canopen_monitor.parse.utilities import FailedValidationError from tests import TEST_EDS @@ -547,7 +546,3 @@ def test_invalid_payload(self): self.assertEqual("Invalid SDO payload length, expected 8, received " "0", str(context.exception)) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/sync_parser_spec.py b/tests/spec_sync_parser.py old mode 100755 new mode 100644 similarity index 89% rename from tests/sync_parser_spec.py rename to tests/spec_sync_parser.py index 895276f..47546a1 --- a/tests/sync_parser_spec.py +++ b/tests/spec_sync_parser.py @@ -1,5 +1,5 @@ import unittest -from canopen_monitor.parser.sync import parse, FailedValidationError +from canopen_monitor.parse.sync import parse, FailedValidationError class TestSYNC(unittest.TestCase): @@ -40,7 +40,3 @@ def test_SYNC_invalid(self): sync_message = b'\x01\xFF' with self.assertRaises(FailedValidationError): parse(None, sync_message, None) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/time_parser_spec.py b/tests/spec_time_parser.py similarity index 89% rename from tests/time_parser_spec.py rename to tests/spec_time_parser.py index 088184f..ef75182 100644 --- a/tests/time_parser_spec.py +++ b/tests/spec_time_parser.py @@ -1,8 +1,8 @@ import unittest from unittest.mock import mock_open, patch -from canopen_monitor.parser import time, eds -from canopen_monitor.parser.utilities import FailedValidationError +from canopen_monitor.parse import time, eds +from canopen_monitor.parse.utilities import FailedValidationError from . import TEST_EDS @@ -42,7 +42,3 @@ def test_TIME_invalid(self): self.assertEqual("Invalid TIME message length", str(context.exception)) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/utilities_spec.py b/tests/utilities_spec.py deleted file mode 100755 index 0fe66df..0000000 --- a/tests/utilities_spec.py +++ /dev/null @@ -1,113 +0,0 @@ -import os -import shutil -import unittest -import canopen_monitor -import canopen_monitor.utilities as utils - - -class TestUtilities(unittest.TestCase): - def setUp(self): - """ - Overload canopen_monitor config paths to build a dummy environment - """ - self.skipTest("This is still in progress") - canopen_monitor.CONFIG_DIR = os.path.abspath('tests/config-env') \ - + os.sep - canopen_monitor.CONFIG_DIR = canopen_monitor.CONFIG_DIR \ - + 'devices.json' - canopen_monitor.LAYOUT_CONFIG = canopen_monitor.LAYOUT_CONFIG \ - + 'layout.json' - canopen_monitor.NODES_CONFIG = canopen_monitor.NODES_CONFIG \ - + 'nodes.json' - - utils.generate_config_dirs() - - def tearDown(self): - """ - Ensure test environment is cleared out. - """ - shutil.rmtree(canopen_monitor.CONFIG_DIR) - - def test_config_dir_generator_good(self): - """ - Test that `generate_config_dirs()` correctly generates - the config dirs needed. - """ - self.assertTrue(os.path.exists(canopen_monitor.CONFIG_DIR)) - - def test_generate_devices_good(self): - """ - Test that the config factory cad succesfully generate - the default devices config and that it can be read back. - """ - expected = ['can0'] - utils.config_factory(canopen_monitor.DEVICES_CONFIG) - config = utils.load_config(canopen_monitor.DEVICES_CONFIG) - self.assertEqual(expected, config) - - def test_generate_node_good(self): - """ - Test that the config factory cad succesfully generate - the default node name override config and that it can be read back. - """ - expected = {'64': "MDC"} - utils.config_factory(canopen_monitor.NODES_CONFIG) - config = utils.load_config(canopen_monitor.NODES_CONFIG) - self.assertEqual(expected, config) - - def test_generate_layout_good(self): - """ - Test that the config factory cad succesfully generate - the default node name override config and that it can be read back. - """ - expected = { - 'type': 'grid', - 'split': 'horizontal', - 'data': [{ - 'type': 'grid', - 'split': 'vertical', - 'data': [{ - 'type': 'table', - 'capacity': 16, - 'dead_node_timeout': 600, - 'name': 'Hearbeats', - 'stale_node_timeout': 60, - 'fields': [], - 'frame_types': ['HB'] - }, { - 'type': 'table', - 'capacity': 16, - 'dead_node_timeout': 600, - 'name': 'Info', - 'stale_node_timeout': 60, - 'fields': [], - 'frame_types': [] - }] - }, { - 'type': 'table', - 'capacity': 16, - 'dead_node_timeout': 60, - 'name': 'Misc', - 'stale_node_timeout': 600, - 'fields': [], - 'frame_types': [ - 'NMT', - 'SYNC', - 'EMCY', - 'TIME', - 'TPDO1', - 'RPDO1', - 'TPDO2', - 'RPDO2', - 'TPDO3', - 'RPDO3', - 'TPDO4', - 'RPDO4', - 'TSDO', - 'RSDO', - 'UKOWN' - ] - }]} - utils.config_factory(canopen_monitor.LAYOUT_CONFIG) - config = utils.load_config(canopen_monitor.LAYOUT_CONFIG) - self.assertEqual(expected, config)