Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
Refactor invalid OD file index handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Boneill3 committed May 23, 2021
1 parent d06cc89 commit 7a4270b
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 87 deletions.
5 changes: 3 additions & 2 deletions canopen_monitor/parse/canopen.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ def parse(self, message: Message) -> str:
parsed_message = parse_function(message.arb_id,
message.data,
eds_config)
except (FailedValidationError, TypeError):
parsed_message = format_bytes(message.data)
except (FailedValidationError, TypeError) as exception:
parsed_message = f"{format_bytes(message.data)} " \
f"Parse Error: {str(exception)}"

return parsed_message
53 changes: 38 additions & 15 deletions canopen_monitor/parse/eds.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,16 @@ class Index:
Note: Not all possible properties are stored
"""

def __init__(self, data, sub_id=None):
def __init__(self, data, index: Union[str, int], is_sub=False):
# Determine if this is a parent index or a child index
if (sub_id is None):
self.is_parent = True
self.sub_indices = []
if not is_sub:
self.sub_indices = {}
self.index = index[2:]
else:
self.is_parent = False
self.sub_id = sub_id
self.sub_indices = None
self.index = str(index)

self.is_sub = is_sub

# Process all sub-data
for e in data:
Expand All @@ -107,18 +108,34 @@ def __init__(self, data, sub_id=None):
value = convert_value(value)

self.__setattr__(camel_to_snake(key), value)
"""
Add a subindex to an index object
:param index: The subindex being added
:type Index
:raise ValueError: A subindex has already been added a this subindex
"""
def add(self, index: Index) -> None:
if self.sub_indices.setdefault(int(index.index), index) != index:
raise ValueError

def add(self, index) -> None:
self.sub_indices.append(index)

"""
Add a subindex to an index object
:param index: The subindex being added
:type Index
:raise ValueError: A subindex has already been added a this subindex
"""
def __getitem__(self, key: int):
return list(filter(lambda x: x.sub_id == key, self.sub_indices))[0]
if key not in self.sub_indices:
raise KeyError(f"{self.index}sub{key}")

return self.sub_indices[key]

def __len__(self) -> int:
if (self.sub_indices is None):
return 1
else:
return 1 + sum(map(lambda x: len(x), self.sub_indices))
return len(self.sub_indices)
# return 1 + sum(map(lambda x: len(x), self.sub_indices))


def convert_value(value: str) -> Union[int, str]:
Expand Down Expand Up @@ -154,11 +171,13 @@ def __init__(self, eds_data: [str]):
id = section[0][1:-1].split('sub')

if all(c in string.hexdigits for c in id[0]):
index = hex(int(id[0], 16))
if len(id) == 1:
self.indices[hex(int(id[0], 16))] = Index(section[1:])
self.indices[index] = Index(section[1:], index)
else:
self.indices[hex(int(id[0], 16))] \
.add(Index(section[1:], sub_id=int(id[1], 16)))
self.indices[index] \
.add(Index(section[1:], int(id[1], 16),
is_sub=True))
else:
name = section[0][1:-1]
self.__setattr__(camel_to_snake(name),
Expand All @@ -177,7 +196,11 @@ def __len__(self) -> int:

def __getitem__(self, key: Union[int, str]) -> Index:
callable = hex if type(key) == int else str
return self.indices.get(callable(key))
key = callable(key)
if key not in self.indices:
raise KeyError(key[2:])

return self.indices[callable(key)]


def load_eds_file(filepath: str) -> EDS:
Expand Down
26 changes: 24 additions & 2 deletions canopen_monitor/parse/pdo.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import string
from math import ceil, floor
from .eds import EDS
from .utilities import FailedValidationError, get_name, decode
from .utilities import FailedValidationError, get_name, decode, format_bytes
from ..can import MessageType

PDO1_TX = 0x1A00
Expand Down Expand Up @@ -116,6 +116,10 @@ def parse_pdo(num_elements, pdo_type, cob_id, eds, data):
for j in range(1, size):
mask = mask << 1
mask += 1

# Possible exceptions from get_name are not caught because they indicate
# an issue with the PDO definition in the OD file, which should be
# checked when the file is loaded
eds_details = get_name(eds, index)
num_bytes = ceil(size / 8)

Expand Down Expand Up @@ -144,7 +148,25 @@ def parse_mpdo(num_elements, pdo_type, eds, data, cob_id):
f"MPDO type and definition do not match. "
f"Check eds file at [{pdo_type}sub0]")

eds_details = get_name(eds, mpdo.index)
try:
eds_details = get_name(eds, mpdo.index)
except KeyError as e:
raise FailedValidationError(data,
cob_id - MessageType.PDO1_TX.value[0],
cob_id,
__name__,
f"MPDO provided type index does not exist. "
f"Check provided index {str(e)}")

except ValueError:
raise FailedValidationError(data,
cob_id - MessageType.PDO1_TX.value[0],
cob_id,
__name__,
f"MPDO provided type index is missing "
f"attributes. Check OD file provided index "
f"[{format_bytes(mpdo.index)}")

return f"{eds_details[1]} - {decode(eds_details[0], mpdo.data)}"


Expand Down
10 changes: 7 additions & 3 deletions canopen_monitor/parse/sdo.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import array
from .eds import EDS
from .utilities import FailedValidationError, get_name, decode
from .utilities import FailedValidationError, get_name, decode, format_bytes
from typing import List
from ..can import MessageType

Expand Down Expand Up @@ -1127,8 +1127,12 @@ def __parse_block_end_no_data(self, data):
def __set_name(self, eds, index: List[int]):
try:
values = get_name(eds, index)
except TypeError:
raise ValueError(f"Unable to eds content at index {str(index)}")
except KeyError as e:
raise ValueError(f"SDO provided index does not exist. Check "
f"provided index {str(e)}")
except ValueError:
raise ValueError(f"SDO provided index is missing attributes. "
f"Check OD file index {format_bytes(index)}")

self.__inProgressType = values[0]
self.__inProgressName = values[1]
8 changes: 4 additions & 4 deletions canopen_monitor/parse/sync.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from .eds import EDS
from .utilities import FailedValidationError, decode, UNSIGNED8
from .utilities import FailedValidationError, decode, DataType


def parse(cob_id: int, data: bytes, eds: EDS):
if len(data) > 1:
raise FailedValidationError(data, cob_id, cob_id, __name__,
'SYNC message is outside of bounds '
'limit of 1 byte, {len(data)} provided')
return f'SYNC - {decode(UNSIGNED8, data)}'
f'SYNC message is outside of bounds '
f'limit of 1 byte, {len(data)} provided')
return f'SYNC - {decode(DataType.UNSIGNED8.value, data)}'
139 changes: 91 additions & 48 deletions canopen_monitor/parse/utilities.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import array
import datetime
from datetime import datetime, timedelta
from struct import unpack
from .eds import EDS
from typing import List, Union
from enum import Enum


class FailedValidationError(Exception):
Expand Down Expand Up @@ -32,87 +33,118 @@ def __init__(self,
self.parse_type = parse_type
self.sub_type = sub_type
self.message = message
self.time_occured = datetime.datetime.now()
self.time_occured = datetime.now()
super().__init__(self.message)


def get_name(eds_config: EDS, index: List[int]) -> (str, str):
def get_name(eds_config: EDS, index: Union[List[int], bytes]) -> (str, str):
"""
Get the name and data type for a given index
:param eds_config: An EDS file for the current node
:param index: the index and subindex to retrieve data from
expected to be length 3. (not validated)
:return: a tuple containing the name and data type as a string
:return: (str, str): a tuple containing the name and data type as a string
:raise: IndexError: The index or subindex failed to find a value in the
provided OD file
:raise: ValueError: The provided index/subindex does not contain a
parameter_name and data_type attribute
"""
index_bytes = list(map(lambda x: hex(x)[2:].rjust(2, '0'), index))
key = int('0x' + ''.join(index_bytes[:2]), 16)
subindex_key = int('0x' + ''.join(index_bytes[2:3]), 16)
current = eds_config[hex(key)]
if current is None:
return "Unknown", "Unknown"

try:
result = eds_config[hex(key)].parameter_name

if len(current) > 0:
result += " " + eds_config[hex(key)][subindex_key].parameter_name
defined_type = eds_config[hex(key)][subindex_key].data_type
else:
defined_type = eds_config[hex(key)].data_type

return defined_type, result

except IndexError:
return "Unknown", "Unknown"


BOOLEAN = '0x0001'
INTEGER8 = '0x0002'
INTEGER16 = '0x0003'
INTEGER32 = '0x0004'
UNSIGNED8 = '0x0005'
UNSIGNED16 = '0x0007'
UNSIGNED32 = '0x0003'
REAL32 = '0x0008'
VISIBLE_STRING = '0x0009'
OCTET_STRING = '0x000A'
UNICODE_STRING = '0x000B'
DOMAIN = '0x000F'
REAL64 = '0x0011'
INTEGER64 = '0x0015'
UNSIGNED64 = '0x001B'
UNKNOWN = 'Unknown'

current = eds_config[hex(key)]
result = eds_config[hex(key)].parameter_name

def decode(defined_type: str, data: List[int]) -> str:
if len(current) > 0:
result += " " + eds_config[hex(key)][subindex_key].parameter_name
defined_type = eds_config[hex(key)][subindex_key].data_type
else:
defined_type = eds_config[hex(key)].data_type

return defined_type, result


class DataType(Enum):
BOOLEAN = '0x0001'
INTEGER8 = '0x0002'
INTEGER16 = '0x0003'
INTEGER32 = '0x0004'
UNSIGNED8 = '0x0005'
UNSIGNED16 = '0x0006'
UNSIGNED32 = '0x0007'
REAL32 = '0x0008'
VISIBLE_STRING = '0x0009'
OCTET_STRING = '0x000A'
UNICODE_STRING = '0x000B'
TIME_OF_DAY = '0x000C'
TIME_DIFFERENCE = '0x000D'
DOMAIN = '0x000F'
INTEGER24 = '0x0010'
REAL64 = '0x0011'
INTEGER40 = '0x0012'
INTEGER48 = '0x0013'
INTEGER56 = '0x0014'
INTEGER64 = '0x0015'
UNSIGNED24 = '0x0016'
UNSIGNED40 = '0x0018'
UNSIGNED48 = '0x0019'
UNSIGNED56 = '0x001A'
UNSIGNED64 = '0x001B'
PDO_COMMUNICATION_PARAMETER = '0x0020'
PDO_MAPPING = '0x0021'
SDO_PARAMETER = '0x0022'
IDENTITY = '0x0023'

# Data Type Groupings
UNSIGNED_INTEGERS = (UNSIGNED8, UNSIGNED16, UNSIGNED32, UNSIGNED24,
UNSIGNED40, UNSIGNED48, UNSIGNED56, UNSIGNED64)

SIGNED_INTEGERS = (INTEGER8, INTEGER16, INTEGER32, INTEGER24,
INTEGER40, INTEGER48, INTEGER56, INTEGER64)

FLOATING_POINTS = (REAL32, REAL64)

NON_FORMATTED = (DOMAIN, PDO_COMMUNICATION_PARAMETER, PDO_MAPPING,
SDO_PARAMETER, IDENTITY)


def decode(defined_type: Union[str, DataType], data: List[int]) -> str:
"""
Decodes data by defined type
:param defined_type: Hex constant for type
:param data: list of ints to be decoded
:return: Decoded data as string
:raise: ValueError: Indicates datatype provided is not supported
"""
if defined_type in (UNSIGNED8, UNSIGNED16, UNSIGNED32, UNSIGNED64):
if defined_type in DataType.UNSIGNED_INTEGERS.value:
result = str(int.from_bytes(data, byteorder="little", signed=False))
elif defined_type in (INTEGER8, INTEGER16, INTEGER32, INTEGER64):
elif defined_type in DataType.SIGNED_INTEGERS.value:
result = str(int.from_bytes(data, byteorder="little", signed=True))
elif defined_type == BOOLEAN:
elif defined_type == DataType.BOOLEAN.value:
if int.from_bytes(data, byteorder="little", signed=False) > 0:
result = str(True)
else:
result = str(False)
elif defined_type in (REAL32, REAL64):
elif defined_type in DataType.FLOATING_POINTS.value:
data = array.array('B', data).tobytes()
result = str(unpack('>f', data)[0])
elif defined_type == VISIBLE_STRING:
elif defined_type == DataType.VISIBLE_STRING.value:
data = array.array('B', data).tobytes()
result = data.decode('utf-8')
elif defined_type in (OCTET_STRING, DOMAIN):
elif defined_type == DataType.OCTET_STRING.value:
data = list(map(lambda x: hex(x)[2:].rjust(2, '0'), data))
result = '0x' + ''.join(data)
elif defined_type == UNICODE_STRING:
elif defined_type == DataType.UNICODE_STRING.value:
data = array.array('B', data).tobytes()
result = data.decode('utf-16-be')
elif defined_type == UNKNOWN:
elif defined_type == DataType.TIME_OF_DAY.value:
delta = get_time_values(data)
date = datetime(1984, 1, 1) + delta
result = date.isoformat()
elif defined_type == DataType.TIME_DIFFERENCE.value:
result = str(get_time_values(data))
elif defined_type in DataType.NON_FORMATTED.value:
result = format_bytes(data)
else:
raise ValueError(f"Invalid data type {defined_type}. "
Expand All @@ -121,6 +153,17 @@ def decode(defined_type: str, data: List[int]) -> str:
return result


def get_time_values(data: [int]) -> timedelta:
# Component ms is the time in milliseconds after midnight. Component
# days is the number of days since January 1, 1984.
# Format UNSIGNED 28 (ms), VOID4, UNSIGNED 16 (Days)
ms_raw = data[:4]
ms_raw[3] = ms_raw[3] >> 4
ms = int.from_bytes(ms_raw, byteorder="little", signed=False)
days = int.from_bytes(data[5:7], byteorder="little", signed=False)
return timedelta(days=days, milliseconds=ms)


def format_bytes(data: Union[List[int], bytes]) -> str:
return ' '.join(list(map(lambda x: hex(x)[2:]
.upper()
Expand Down
Loading

0 comments on commit 7a4270b

Please sign in to comment.