Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
Add support for invalid index/subindex requests
Browse files Browse the repository at this point in the history
  • Loading branch information
Boneill3 committed May 21, 2021
1 parent 93310de commit d06cc89
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 17 deletions.
8 changes: 3 additions & 5 deletions canopen_monitor/parse/canopen.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
emcy as EMCYParser, \
time as TIMEParser
from .sdo import SDOParser
from .utilities import FailedValidationError
from .utilities import FailedValidationError, format_bytes


class CANOpenParser:
Expand Down Expand Up @@ -65,8 +65,6 @@ def parse(self, message: Message) -> str:
message.data,
eds_config)
except (FailedValidationError, TypeError):
parsed_message = ' '.join(list(map(lambda x: hex(x)[2:]
.upper()
.rjust(2, '0'),
message.data)))
parsed_message = format_bytes(message.data)

return parsed_message
4 changes: 2 additions & 2 deletions canopen_monitor/parse/pdo.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def parse(cob_id: int, data: bytes, eds: EDS):
f"expected between 1 and 8")
try:
eds_elements = eds[hex(pdo_type)][0]
except TypeError:
except (TypeError, IndexError):
raise FailedValidationError(data,
cob_id - MessageType.PDO1_TX.value[0],
cob_id,
Expand Down Expand Up @@ -100,7 +100,7 @@ def parse_pdo(num_elements, pdo_type, cob_id, eds, data):
for i in range(num_elements, 0, -1):
try:
eds_record = eds[hex(pdo_type)][i]
except TypeError:
except (TypeError, IndexError):
raise FailedValidationError(data,
cob_id - MessageType.PDO1_TX.value[0],
cob_id,
Expand Down
30 changes: 22 additions & 8 deletions canopen_monitor/parse/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import datetime
from struct import unpack
from .eds import EDS
from typing import List
from typing import List, Union


class FailedValidationError(Exception):
Expand Down Expand Up @@ -51,15 +51,19 @@ def get_name(eds_config: EDS, index: List[int]) -> (str, str):
if current is None:
return "Unknown", "Unknown"

result = eds_config[hex(key)].parameter_name
try:
result = eds_config[hex(key)].parameter_name

if len(current) > 0:
result += " " + eds_config[hex(key)][subindex_key].parameter_name
defined_type = eds_config[hex(key)][subindex_key].data_type
else:
defined_type = eds_config[hex(key)].data_type
if len(current) > 0:
result += " " + eds_config[hex(key)][subindex_key].parameter_name
defined_type = eds_config[hex(key)][subindex_key].data_type
else:
defined_type = eds_config[hex(key)].data_type

return defined_type, result

return defined_type, result
except IndexError:
return "Unknown", "Unknown"


BOOLEAN = '0x0001'
Expand All @@ -77,6 +81,7 @@ def get_name(eds_config: EDS, index: List[int]) -> (str, str):
REAL64 = '0x0011'
INTEGER64 = '0x0015'
UNSIGNED64 = '0x001B'
UNKNOWN = 'Unknown'


def decode(defined_type: str, data: List[int]) -> str:
Expand Down Expand Up @@ -107,8 +112,17 @@ def decode(defined_type: str, data: List[int]) -> str:
elif defined_type == UNICODE_STRING:
data = array.array('B', data).tobytes()
result = data.decode('utf-16-be')
elif defined_type == UNKNOWN:
result = format_bytes(data)
else:
raise ValueError(f"Invalid data type {defined_type}. "
f"Unable to decode data {str(data)}")

return result


def format_bytes(data: Union[List[int], bytes]) -> str:
return ' '.join(list(map(lambda x: hex(x)[2:]
.upper()
.rjust(2, '0'),
data)))
4 changes: 2 additions & 2 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1643,7 +1643,7 @@
;StorageLocation=RAM
DataType=0x0005
AccessType=rw
DefaultValue=0xFF
DefaultValue=0x01
PDOMapping=0
[1A03sub1]
Expand All @@ -1652,7 +1652,7 @@
;StorageLocation=RAM
DataType=0x0007
AccessType=rw
DefaultValue=0x00000000
DefaultValue=0x31010620
PDOMapping=0
[1A03sub2]
Expand Down
11 changes: 11 additions & 0 deletions tests/spec_pdo_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from canopen_monitor.parse import eds
from canopen_monitor.parse.pdo import parse
from canopen_monitor.parse.utilities import FailedValidationError
from tests import TEST_EDS


Expand Down Expand Up @@ -62,3 +63,13 @@ def test_mpdo_with_SAM(self):
self.assertEqual("Orientation orientation - 1.0",
parse(0x380, pdo_message, self.eds_data),
"Error on MPDO SAM Message parse")

def test_pdo_trasmit_with_invalid_index(self):
"""
Test PDO transmit with invalid OD File index
"""
pdo_message = [0x3f, 0x80, 0x0, 0x0]
self.assertEqual("Unknown - 3F 80 00 00",
parse(0x480, pdo_message, self.eds_data),
"Error on PDO Message parse")

29 changes: 29 additions & 0 deletions tests/spec_sdo_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,3 +546,32 @@ def test_invalid_payload(self):

self.assertEqual("Invalid SDO payload length, expected 8, received "
"0", str(context.exception))

def test_expedited_invalid_index(self):
"""
Text expedited SDO transfer with an invalid index being used
"""
parser = SDOParser()
client_initiate_message = [0x27, 0x10, 0x17, 0x00, 0x0A, 0x00, 0x00,
0x00]
self.assertEqual("Downloaded - Unknown: 0A 00 00 00",
parser.parse(0x600, client_initiate_message,
self.eds_data),
"Error on Client Initiate Message")
self.assertEqual(False, parser.is_complete,
"Parser should be incomplete")

def test_expedited_invalid_subindex(self):
"""
Text expedited SDO transfer with a valid index and invalid subindex
being used
"""
parser = SDOParser()
client_initiate_message = [0x27, 0x10, 0x18, 0x07, 0x0A, 0x00, 0x00,
0x00]
self.assertEqual("Downloaded - Unknown: 0A 00 00 00",
parser.parse(0x600, client_initiate_message,
self.eds_data),
"Error on Client Initiate Message")
self.assertEqual(False, parser.is_complete,
"Parser should be incomplete")

0 comments on commit d06cc89

Please sign in to comment.