diff --git a/src/python_testing/TC_IDM_3_2.py b/src/python_testing/TC_IDM_3_2.py index 19baa24d037be8..c1b04b5d73d5bd 100644 --- a/src/python_testing/TC_IDM_3_2.py +++ b/src/python_testing/TC_IDM_3_2.py @@ -24,7 +24,7 @@ async def all_type_attributes_for_cluster(self, cluster: ClusterObjects.Cluster, if desired_type == MatterIntEnum: all_attributes_of_type = [attribute for attribute in all_attributes if type( attribute.attribute_type.Type) == type(ClusterObjects.ClusterObjectFieldDescriptor(Type=desired_type).Type)] - + elif desired_type == IntFlag: if hasattr(cluster, 'Attributes'): attributes_class = getattr(cluster, 'Attributes') @@ -37,7 +37,7 @@ async def all_type_attributes_for_cluster(self, cluster: ClusterObjects.Cluster, else: all_attributes_of_type = [attribute for attribute in all_attributes if attribute.attribute_type == ClusterObjects.ClusterObjectFieldDescriptor(Type=desired_type)] - + return all_attributes_of_type def all_device_clusters(self) -> set: @@ -70,13 +70,13 @@ def pick_writable_value(self, attribute): elif bytes in types_in_union or attribute_type == bytes: value = bytes("Hello World", "utf-8") elif IntFlag in types_in_union or attribute_type == IntFlag: - value = None # Todo: Fill in + value = None # Todo: Fill in elif list in types_in_union or attribute_type == list: value = [1, 2, 3, 4, 5, 6] elif ClusterObjects.ClusterObjectFieldDescriptor(Type=MatterIntEnum).Type in types_in_union or attribute_type == ClusterObjects.ClusterObjectFieldDescriptor(Type=MatterIntEnum).Type: - value = None # Todo: Fill in + value = None # Todo: Fill in elif ClusterObject in types_in_union or attribute_type == ClusterObject: - value = None # Todo: Fill in + value = None # Todo: Fill in else: value = None return value @@ -89,7 +89,7 @@ async def check_attribute_write_for_type(self, desired_attribute_type: type) -> all_types = list(set(all_types) & self.device_attributes) attributes_of_type = set(all_types) attributes_of_type_on_device = attributes_of_type.intersection(set(clusterdata.keys())) - + if attributes_of_type_on_device or attributes_of_type: print(f'attributes_of_type_on_device: {attributes_of_type_on_device}, attributes_of_type: {attributes_of_type}') chosen_attribute = next(iter(attributes_of_type_on_device)) @@ -109,11 +109,10 @@ async def test_TC_IDM_3_2(self): self.xml_clusters, self.problems = build_xml_clusters() all_clusters = [cluster for cluster in Clusters.ClusterObjects.ALL_ATTRIBUTES] expected_descriptor_attributes = ClusterObjects.ALL_ATTRIBUTES[Clusters.Objects.Descriptor.id] - + read_request = await self.default_controller.ReadAttribute(self.dut_node_id, [(0, Clusters.Objects.Descriptor)]) returned_attributes = [a for a in read_request[0][Clusters.Objects.Descriptor].keys() if a != Clusters.Attribute.DataVersion] - self.device_clusters = self.all_device_clusters() self.device_attributes = self.all_device_attributes() @@ -142,31 +141,31 @@ async def test_TC_IDM_3_2(self): if cluster_id in Clusters.ClusterObjects.ALL_ATTRIBUTES and attribute_id in Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id]: attribute = Clusters.ClusterObjects.ALL_ATTRIBUTES[cluster_id][attribute_id] if hasattr(attribute, 'value') and write_access >= Clusters.AccessControl.Enums.AccessControlEntryPrivilegeEnum.kOperate: - + writable_attributes.append(attribute) if cluster_id not in command_map: command_map[cluster_id] = self.xml_clusters[cluster_id].command_map writable_clusters = [] - + for writable_attribute in writable_attributes: cluster = Clusters.ClusterObjects.ALL_CLUSTERS[writable_attribute.cluster_id] if cluster not in writable_clusters: writable_clusters.append(cluster) - + writable_attributes_iter = iter(writable_attributes) chosen_writable_attribute = next(writable_attributes_iter) - + chosen_writable_cluster = Clusters.ClusterObjects.ALL_CLUSTERS[chosen_writable_attribute.cluster_id] if chosen_writable_cluster in self.device_clusters and chosen_writable_attribute in self.device_attributes: output_1 = await self.default_controller.Read(self.dut_node_id, [chosen_writable_attribute]) - + if output_1: # Skip if no output -- e.g., happens with Objects.PumpConfigurationAndControl.Attributes.LifetimeRunningHours endpoint = next(iter(output_1.attributes)) - + original_value = output_1.attributes[endpoint][chosen_writable_cluster][chosen_writable_attribute] value = self.pick_writable_value(chosen_writable_attribute) @@ -175,13 +174,14 @@ async def test_TC_IDM_3_2(self): asserts.assert_not_equal(output_1.attributes[endpoint][chosen_writable_cluster][chosen_writable_attribute], output_2.attributes[endpoint][chosen_writable_cluster][chosen_writable_attribute], - "Output did not change") + "Output did not change") asserts.assert_equal(output_2.attributes[endpoint][chosen_writable_cluster][chosen_writable_attribute], value) write_output = await self.default_controller.WriteAttribute(self.dut_node_id, [(endpoint, chosen_writable_attribute(value=original_value))]) asserts.assert_equal(write_output[0].Status, 0, "Write failed") output_3 = await self.default_controller.Read(self.dut_node_id, [chosen_writable_attribute]) - asserts.assert_equal(output_3.attributes[endpoint][chosen_writable_cluster][chosen_writable_attribute], original_value, "Failure writing back to original value") + asserts.assert_equal(output_3.attributes[endpoint][chosen_writable_cluster] + [chosen_writable_attribute], original_value, "Failure writing back to original value") # Step 2 @@ -190,7 +190,7 @@ async def test_TC_IDM_3_2(self): # TH reads the attribute value and saves as original. TH sends a WriteRequestMessage to the DUT to write to the attribute on all endpoints. It should set a value that is different than original. # Verify that the DUT sends a WriteResponseMessage with any status except UNSUPPORTED_WRITE or DATA_VERSION_MISMATCH. If the Status is SUCCESS, verify the updated value by sending a ReadRequestMessage for all affected paths. If the status is SUCCESS, send a WriteRequestMessage to set the value back to `original`. - + chosen_writable_attribute = next(writable_attributes_iter) chosen_writable_cluster = Clusters.ClusterObjects.ALL_CLUSTERS[chosen_writable_attribute.cluster_id] if chosen_writable_cluster in self.device_clusters and chosen_writable_attribute in self.device_attributes: @@ -205,8 +205,8 @@ async def test_TC_IDM_3_2(self): asserts.assert_not_equal(output_1.attributes[endpoint][chosen_writable_cluster][chosen_writable_attribute], output_2.attributes[endpoint][chosen_writable_cluster][chosen_writable_attribute], - "Output did not change") - + "Output did not change") + # Step 3 # TH selects a writeable attribute of type bool that is present on at least one endpoint. @@ -269,7 +269,7 @@ async def test_TC_IDM_3_2(self): # Verify that the DUT sends a WriteResponseMessage with any status except UNSUPPORTED_WRITE or DATA_VERSION_MISMATCH. If the Status is SUCCESS, verify the updated value by sending a ReadRequestMessage for all affected paths. If the status is SUCCESS, send a WriteRequestMessage to set the value back to `original`. await self.check_attribute_write_for_type(ClusterObject) -# + # Step 10 # TH selects a writeable attribute of type list that is present on at least one endpoint. @@ -287,7 +287,7 @@ async def test_TC_IDM_3_2(self): # Verify that the DUT sends a WriteResponseMessage with any status except UNSUPPORTED_WRITE or DATA_VERSION_MISMATCH. If the Status is SUCCESS, verify the updated value by sending a ReadRequestMessage for all affected paths. If the status is SUCCESS, send a WriteRequestMessage to set the value back to `original`. await self.check_attribute_write_for_type(MatterIntEnum) -# + # Step 12 # TH selects a writeable attribute of type bitmap that is present on at least one endpoint. @@ -397,10 +397,10 @@ async def test_TC_IDM_3_2(self): value = self.pick_writable_value(chosen_writable_attribute) data_version = output_1.attributes[endpoint][Clusters.Thermostat][Clusters.Attribute.DataVersion] result = await self.write_single_attribute(attribute_value=chosen_writable_attribute(value=value), endpoint_id=endpoint) - + asserts.assert_true(isinstance(result.Reason, InteractionModelError), msg="Unexpected success writing invalid attribute") - + # Step 18 # TH sends a ReadRequest message to the DUT to read any writeable attribute on any cluster. @@ -409,7 +409,7 @@ async def test_TC_IDM_3_2(self): # TH sends a WriteRequestMessage to the DUT to modify the value of the selected attribute no DataVersion indicated. # TH sends a second WriteRequestMessage to the DUT to modify the value of an attribute with the dataversion field set to the value received earlier. # Verify that the DUT sends a Write Response message with the error DATA_VERSION_MISMATCH for the second Write request. - + chosen_writable_attribute = next(writable_attributes_iter) chosen_writable_cluster = Clusters.ClusterObjects.ALL_CLUSTERS[chosen_writable_attribute.cluster_id] output_1 = await self.default_controller.Read(self.dut_node_id, [chosen_writable_attribute]) @@ -419,11 +419,10 @@ async def test_TC_IDM_3_2(self): data_version = output_1.attributes[endpoint][Clusters.Thermostat][Clusters.Attribute.DataVersion] result = await self.write_single_attribute(attribute_value=chosen_writable_attribute(value=value), endpoint_id=endpoint) result = await self.write_single_attribute(attribute_value=chosen_writable_attribute(value=value + 1), endpoint_id=endpoint) - + asserts.assert_true(isinstance(result.Reason, InteractionModelError), msg="Unexpected success writing invalid attribute") - # Step 19 # TH sends the WriteRequestMessage to the DUT to modify the value of a specific attribute data that needs Timed Write transaction to write and this action is not part of a Timed Write transaction. @@ -432,15 +431,15 @@ async def test_TC_IDM_3_2(self): # On the TH verify that the DUT sends a status code NEEDS_TIMED_INTERACTION. command_map_iter = iter(command_map) - + while True: - + cluster_id = next(command_map_iter) command_dict = command_map[cluster_id] chosen_writable_cluster = Clusters.ClusterObjects.ALL_CLUSTERS[cluster_id] if command_dict and chosen_writable_cluster in writable_clusters: break - + for writable_attribute in writable_attributes: if writable_attribute.cluster_id == chosen_writable_cluster.id: chosen_writable_attribute = writable_attribute @@ -449,7 +448,7 @@ async def test_TC_IDM_3_2(self): chosen_command_str = next(iter(command_dict)) value = self.pick_writable_value(chosen_writable_attribute) chosen_command = getattr(chosen_writable_cluster.Commands, chosen_command_str) - + await self.send_single_cmd(timedRequestTimeoutMs=1000, cmd=chosen_command(is_client=False)) value = self.pick_writable_value(chosen_writable_attribute) result = await self.write_single_attribute(attribute_value=chosen_writable_attribute(value=value), endpoint_id=endpoint) @@ -460,5 +459,6 @@ async def test_TC_IDM_3_2(self): # Passing any of these (e.g. Clusters.HepaFilterMonitoring.Commands.ResetCondition(is_client=False) gives a TypeError with an # unexpected argument, so passing this to await self.send_single_cmd(cmd=cmd) can't be done. How to filter out commands that work?) + if __name__ == "__main__": default_matter_test_main()