Skip to content

Commit

Permalink
Fixed more styling issues on TC_IDM_3_2.py
Browse files Browse the repository at this point in the history
  • Loading branch information
austina-csa authored Oct 28, 2024
1 parent 9acbb3a commit cd1f6fd
Showing 1 changed file with 30 additions and 30 deletions.
60 changes: 30 additions & 30 deletions src/python_testing/TC_IDM_3_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def all_type_attributes_for_cluster(self, cluster: ClusterObjects.Cluster,
if desired_type == MatterIntEnum:
all_attributes_of_type = [attribute for attribute in all_attributes if type(
attribute.attribute_type.Type) == type(ClusterObjects.ClusterObjectFieldDescriptor(Type=desired_type).Type)]

elif desired_type == IntFlag:
if hasattr(cluster, 'Attributes'):
attributes_class = getattr(cluster, 'Attributes')
Expand All @@ -37,7 +37,7 @@ async def all_type_attributes_for_cluster(self, cluster: ClusterObjects.Cluster,
else:
all_attributes_of_type = [attribute for attribute in all_attributes if attribute.attribute_type ==
ClusterObjects.ClusterObjectFieldDescriptor(Type=desired_type)]

return all_attributes_of_type

def all_device_clusters(self) -> set:
Expand Down Expand Up @@ -70,13 +70,13 @@ def pick_writable_value(self, attribute):
elif bytes in types_in_union or attribute_type == bytes:
value = bytes("Hello World", "utf-8")
elif IntFlag in types_in_union or attribute_type == IntFlag:
value = None # Todo: Fill in
value = None # Todo: Fill in
elif list in types_in_union or attribute_type == list:
value = [1, 2, 3, 4, 5, 6]
elif ClusterObjects.ClusterObjectFieldDescriptor(Type=MatterIntEnum).Type in types_in_union or attribute_type == ClusterObjects.ClusterObjectFieldDescriptor(Type=MatterIntEnum).Type:
value = None # Todo: Fill in
value = None # Todo: Fill in
elif ClusterObject in types_in_union or attribute_type == ClusterObject:
value = None # Todo: Fill in
value = None # Todo: Fill in
else:
value = None
return value
Expand All @@ -89,7 +89,7 @@ async def check_attribute_write_for_type(self, desired_attribute_type: type) ->
all_types = list(set(all_types) & self.device_attributes)
attributes_of_type = set(all_types)
attributes_of_type_on_device = attributes_of_type.intersection(set(clusterdata.keys()))

if attributes_of_type_on_device or attributes_of_type:
print(f'attributes_of_type_on_device: {attributes_of_type_on_device}, attributes_of_type: {attributes_of_type}')
chosen_attribute = next(iter(attributes_of_type_on_device))
Expand All @@ -109,11 +109,10 @@ async def test_TC_IDM_3_2(self):
self.xml_clusters, self.problems = build_xml_clusters()
all_clusters = [cluster for cluster in Clusters.ClusterObjects.ALL_ATTRIBUTES]
expected_descriptor_attributes = ClusterObjects.ALL_ATTRIBUTES[Clusters.Objects.Descriptor.id]

read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor)])
returned_attributes = [a for a in read_request[0][Clusters.Objects.Descriptor].keys() if a !=
Clusters.Attribute.DataVersion]


self.device_clusters = self.all_device_clusters()
self.device_attributes = self.all_device_attributes()
Expand Down Expand Up @@ -142,31 +141,31 @@ async def test_TC_IDM_3_2(self):
if cluster_id in Clusters.ClusterObjects.ALL_ATTRIBUTES and attribute_id in Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id]:
attribute = Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id][attribute_id]
if hasattr(attribute, 'value') and write_access >= Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kOperate:

writable_attributes.append(attribute)
if cluster_id not in command_map:
command_map[cluster_id] = self.xml_clusters[cluster_id].command_map

writable_clusters = []

for writable_attribute in writable_attributes:
cluster = Clusters.ClusterObjects.ALL_CLUSTERS[writable_attribute.cluster_id]
if cluster not in writable_clusters:
writable_clusters.append(cluster)

writable_attributes_iter = iter(writable_attributes)

chosen_writable_attribute = next(writable_attributes_iter)

chosen_writable_cluster = Clusters.ClusterObjects.ALL_CLUSTERS[chosen_writable_attribute.cluster_id]
if chosen_writable_cluster in self.device_clusters and chosen_writable_attribute in self.device_attributes:

output_1 = await self.default_controller.Read(self.dut_node_id, [chosen_writable_attribute])

if output_1: # Skip if no output -- e.g., happens with Objects.PumpConfigurationAndControl.Attributes.LifetimeRunningHours

endpoint = next(iter(output_1.attributes))

original_value = output_1.attributes[endpoint][chosen_writable_cluster][chosen_writable_attribute]

value = self.pick_writable_value(chosen_writable_attribute)
Expand All @@ -175,13 +174,14 @@ async def test_TC_IDM_3_2(self):

asserts.assert_not_equal(output_1.attributes[endpoint][chosen_writable_cluster][chosen_writable_attribute],
output_2.attributes[endpoint][chosen_writable_cluster][chosen_writable_attribute],
"Output did not change")
"Output did not change")
asserts.assert_equal(output_2.attributes[endpoint][chosen_writable_cluster][chosen_writable_attribute], value)
write_output = await self.default_controller.WriteAttribute(self.dut_node_id, [(endpoint, chosen_writable_attribute(value=original_value))])
asserts.assert_equal(write_output[0].Status, 0, "Write failed")

output_3 = await self.default_controller.Read(self.dut_node_id, [chosen_writable_attribute])
asserts.assert_equal(output_3.attributes[endpoint][chosen_writable_cluster][chosen_writable_attribute], original_value, "Failure writing back to original value")
asserts.assert_equal(output_3.attributes[endpoint][chosen_writable_cluster]
[chosen_writable_attribute], original_value, "Failure writing back to original value")

# Step 2

Expand All @@ -190,7 +190,7 @@ async def test_TC_IDM_3_2(self):
# TH reads the attribute value and saves as original. TH sends a WriteRequestMessage to the DUT to write to the attribute on all endpoints. It should set a value that is different than original.

# Verify that the DUT sends a WriteResponseMessage with any status except UNSUPPORTED_WRITE or DATA_VERSION_MISMATCH. If the Status is SUCCESS, verify the updated value by sending a ReadRequestMessage for all affected paths. If the status is SUCCESS, send a WriteRequestMessage to set the value back to `original`.

chosen_writable_attribute = next(writable_attributes_iter)
chosen_writable_cluster = Clusters.ClusterObjects.ALL_CLUSTERS[chosen_writable_attribute.cluster_id]
if chosen_writable_cluster in self.device_clusters and chosen_writable_attribute in self.device_attributes:
Expand All @@ -205,8 +205,8 @@ async def test_TC_IDM_3_2(self):

asserts.assert_not_equal(output_1.attributes[endpoint][chosen_writable_cluster][chosen_writable_attribute],
output_2.attributes[endpoint][chosen_writable_cluster][chosen_writable_attribute],
"Output did not change")
"Output did not change")

# Step 3

# TH selects a writeable attribute of type bool that is present on at least one endpoint.
Expand Down Expand Up @@ -269,7 +269,7 @@ async def test_TC_IDM_3_2(self):
# Verify that the DUT sends a WriteResponseMessage with any status except UNSUPPORTED_WRITE or DATA_VERSION_MISMATCH. If the Status is SUCCESS, verify the updated value by sending a ReadRequestMessage for all affected paths. If the status is SUCCESS, send a WriteRequestMessage to set the value back to `original`.

await self.check_attribute_write_for_type(ClusterObject)
#

# Step 10

# TH selects a writeable attribute of type list that is present on at least one endpoint.
Expand All @@ -287,7 +287,7 @@ async def test_TC_IDM_3_2(self):
# Verify that the DUT sends a WriteResponseMessage with any status except UNSUPPORTED_WRITE or DATA_VERSION_MISMATCH. If the Status is SUCCESS, verify the updated value by sending a ReadRequestMessage for all affected paths. If the status is SUCCESS, send a WriteRequestMessage to set the value back to `original`.

await self.check_attribute_write_for_type(MatterIntEnum)
#

# Step 12

# TH selects a writeable attribute of type bitmap that is present on at least one endpoint.
Expand Down Expand Up @@ -397,10 +397,10 @@ async def test_TC_IDM_3_2(self):
value = self.pick_writable_value(chosen_writable_attribute)
data_version = output_1.attributes[endpoint][Clusters.Thermostat][Clusters.Attribute.DataVersion]
result = await self.write_single_attribute(attribute_value=chosen_writable_attribute(value=value), endpoint_id=endpoint)

asserts.assert_true(isinstance(result.Reason, InteractionModelError),
msg="Unexpected success writing invalid attribute")

# Step 18

# TH sends a ReadRequest message to the DUT to read any writeable attribute on any cluster.
Expand All @@ -409,7 +409,7 @@ async def test_TC_IDM_3_2(self):
# TH sends a WriteRequestMessage to the DUT to modify the value of the selected attribute no DataVersion indicated.
# TH sends a second WriteRequestMessage to the DUT to modify the value of an attribute with the dataversion field set to the value received earlier.
# Verify that the DUT sends a Write Response message with the error DATA_VERSION_MISMATCH for the second Write request.

chosen_writable_attribute = next(writable_attributes_iter)
chosen_writable_cluster = Clusters.ClusterObjects.ALL_CLUSTERS[chosen_writable_attribute.cluster_id]
output_1 = await self.default_controller.Read(self.dut_node_id, [chosen_writable_attribute])
Expand All @@ -419,11 +419,10 @@ async def test_TC_IDM_3_2(self):
data_version = output_1.attributes[endpoint][Clusters.Thermostat][Clusters.Attribute.DataVersion]
result = await self.write_single_attribute(attribute_value=chosen_writable_attribute(value=value), endpoint_id=endpoint)
result = await self.write_single_attribute(attribute_value=chosen_writable_attribute(value=value + 1), endpoint_id=endpoint)

asserts.assert_true(isinstance(result.Reason, InteractionModelError),
msg="Unexpected success writing invalid attribute")


# Step 19

# TH sends the WriteRequestMessage to the DUT to modify the value of a specific attribute data that needs Timed Write transaction to write and this action is not part of a Timed Write transaction.
Expand All @@ -432,15 +431,15 @@ async def test_TC_IDM_3_2(self):
# On the TH verify that the DUT sends a status code NEEDS_TIMED_INTERACTION.

command_map_iter = iter(command_map)

while True:

cluster_id = next(command_map_iter)
command_dict = command_map[cluster_id]
chosen_writable_cluster = Clusters.ClusterObjects.ALL_CLUSTERS[cluster_id]
if command_dict and chosen_writable_cluster in writable_clusters:
break

for writable_attribute in writable_attributes:
if writable_attribute.cluster_id == chosen_writable_cluster.id:
chosen_writable_attribute = writable_attribute
Expand All @@ -449,7 +448,7 @@ async def test_TC_IDM_3_2(self):
chosen_command_str = next(iter(command_dict))
value = self.pick_writable_value(chosen_writable_attribute)
chosen_command = getattr(chosen_writable_cluster.Commands, chosen_command_str)

await self.send_single_cmd(timedRequestTimeoutMs=1000, cmd=chosen_command(is_client=False))
value = self.pick_writable_value(chosen_writable_attribute)
result = await self.write_single_attribute(attribute_value=chosen_writable_attribute(value=value), endpoint_id=endpoint)
Expand All @@ -460,5 +459,6 @@ async def test_TC_IDM_3_2(self):
# Passing any of these (e.g. Clusters.HepaFilterMonitoring.Commands.ResetCondition(is_client=False) gives a TypeError with an
# unexpected argument, so passing this to await self.send_single_cmd(cmd=cmd) can't be done. How to filter out commands that work?)


if __name__ == "__main__":
default_matter_test_main()

0 comments on commit cd1f6fd

Please sign in to comment.