From d292892f68cc17b45357802e7611c23ca7a07b2d Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Tue, 17 Sep 2024 15:51:16 +0200 Subject: [PATCH] [Python] Process attribute cache updates in Python thread (#35557) * [Python] Process attribute cache updates in Python thread Instead of processing the attribute update in the SDK thread, process them on request in the Python thread. This avoids acks being sent back too late to the device after the last DataReport if there are many attribute updates sent at once. Currently still the same data model and processing is done. There is certainly also room for optimization to make this more efficient. * Get updated attribute values Make sure to get the attribute values again after each command to get the updated attribute cache. * Reference ReadEvent/ReadAttribute APIs on dev controller object --- .../repl/Matter_Basic_Interactions.ipynb | 4 +- src/controller/python/chip/ChipDeviceCtrl.py | 132 ++++++++++-------- .../python/chip/clusters/Attribute.py | 72 +++++----- .../test/test_scripts/cluster_objects.py | 9 +- 4 files changed, 116 insertions(+), 101 deletions(-) diff --git a/docs/guides/repl/Matter_Basic_Interactions.ipynb b/docs/guides/repl/Matter_Basic_Interactions.ipynb index 41c1c788655612..bc021aec730fa9 100644 --- a/docs/guides/repl/Matter_Basic_Interactions.ipynb +++ b/docs/guides/repl/Matter_Basic_Interactions.ipynb @@ -3504,7 +3504,7 @@ "source": [ "#### Read Events:\n", "\n", - "A `ReadEvents` API exists that behaves similarly to the `ReadAttributes` API. It permits the same degrees of wildcard expression as its counterpart and follows the same format for expressing all wildcard permutations." + "A `ReadEvent` API exists that behaves similarly to the `ReadAttribute` API. It permits the same degrees of wildcard expression as its counterpart and follows the same format for expressing all wildcard permutations." ] }, { @@ -3609,7 +3609,7 @@ "source": [ "### Subscription Interaction\n", "\n", - "To subscribe to a Node, the same `ReadAttributes` API is used to trigger a subscription, with a valid `reportInterval` tuple passed in being used as a way to indicate the request to create a subscription." + "To subscribe to a Node, the same `ReadAttribute` API is used to trigger a subscription, with a valid `reportInterval` tuple passed in being used as a way to indicate the request to create a subscription." ] }, { diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py index d76d415d746945..8c751f7f791dd0 100644 --- a/src/controller/python/chip/ChipDeviceCtrl.py +++ b/src/controller/python/chip/ChipDeviceCtrl.py @@ -1433,20 +1433,23 @@ def _parseEventPathTuple(self, pathTuple: typing.Union[ else: raise ValueError("Unsupported Attribute Path") - async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing.Union[ - None, # Empty tuple, all wildcard - typing.Tuple[int], # Endpoint - # Wildcard endpoint, Cluster id present - typing.Tuple[typing.Type[ClusterObjects.Cluster]], - # Wildcard endpoint, Cluster + Attribute present - typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]], - # Wildcard attribute id - typing.Tuple[int, typing.Type[ClusterObjects.Cluster]], - # Concrete path - typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]], - # Directly specified attribute path - ClusterAttribute.AttributePath - ]]] = None, + async def Read( + self, + nodeid: int, + attributes: typing.Optional[typing.List[typing.Union[ + None, # Empty tuple, all wildcard + typing.Tuple[int], # Endpoint + # Wildcard endpoint, Cluster id present + typing.Tuple[typing.Type[ClusterObjects.Cluster]], + # Wildcard endpoint, Cluster + Attribute present + typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]], + # Wildcard attribute id + typing.Tuple[int, typing.Type[ClusterObjects.Cluster]], + # Concrete path + typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]], + # Directly specified attribute path + ClusterAttribute.AttributePath + ]]] = None, dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None, events: typing.Optional[typing.List[ typing.Union[ None, # Empty tuple, all wildcard @@ -1461,10 +1464,11 @@ async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing # Concrete path typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int] ]]] = None, - eventNumberFilter: typing.Optional[int] = None, - returnClusterObject: bool = False, reportInterval: typing.Optional[typing.Tuple[int, int]] = None, - fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True, - payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD): + eventNumberFilter: typing.Optional[int] = None, + returnClusterObject: bool = False, reportInterval: typing.Optional[typing.Tuple[int, int]] = None, + fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True, + payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD + ): ''' Read a list of attributes and/or events from a target node @@ -1534,33 +1538,43 @@ async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing eventPaths = [self._parseEventPathTuple( v) for v in events] if events else None - ClusterAttribute.Read(future=future, eventLoop=eventLoop, device=device.deviceProxy, devCtrl=self, + transaction = ClusterAttribute.AsyncReadTransaction(future, eventLoop, self, returnClusterObject) + ClusterAttribute.Read(transaction, device=device.deviceProxy, attributes=attributePaths, dataVersionFilters=clusterDataVersionFilters, events=eventPaths, - eventNumberFilter=eventNumberFilter, returnClusterObject=returnClusterObject, + eventNumberFilter=eventNumberFilter, subscriptionParameters=ClusterAttribute.SubscriptionParameters( reportInterval[0], reportInterval[1]) if reportInterval else None, fabricFiltered=fabricFiltered, keepSubscriptions=keepSubscriptions, autoResubscribe=autoResubscribe).raise_on_error() - return await future + await future - async def ReadAttribute(self, nodeid: int, attributes: typing.Optional[typing.List[typing.Union[ - None, # Empty tuple, all wildcard - typing.Tuple[int], # Endpoint - # Wildcard endpoint, Cluster id present - typing.Tuple[typing.Type[ClusterObjects.Cluster]], - # Wildcard endpoint, Cluster + Attribute present - typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]], - # Wildcard attribute id - typing.Tuple[int, typing.Type[ClusterObjects.Cluster]], - # Concrete path - typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]], - # Directly specified attribute path - ClusterAttribute.AttributePath - ]]], dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None, - returnClusterObject: bool = False, - reportInterval: typing.Optional[typing.Tuple[int, int]] = None, - fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True, - payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD): + if result := transaction.GetSubscriptionHandler(): + return result + else: + return transaction.GetReadResponse() + + async def ReadAttribute( + self, + nodeid: int, + attributes: typing.Optional[typing.List[typing.Union[ + None, # Empty tuple, all wildcard + typing.Tuple[int], # Endpoint + # Wildcard endpoint, Cluster id present + typing.Tuple[typing.Type[ClusterObjects.Cluster]], + # Wildcard endpoint, Cluster + Attribute present + typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]], + # Wildcard attribute id + typing.Tuple[int, typing.Type[ClusterObjects.Cluster]], + # Concrete path + typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]], + # Directly specified attribute path + ClusterAttribute.AttributePath + ]]], dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None, + returnClusterObject: bool = False, + reportInterval: typing.Optional[typing.Tuple[int, int]] = None, + fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True, + payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD + ): ''' Read a list of attributes from a target node, this is a wrapper of DeviceController.Read() @@ -1629,24 +1643,28 @@ async def ReadAttribute(self, nodeid: int, attributes: typing.Optional[typing.Li else: return res.attributes - async def ReadEvent(self, nodeid: int, events: typing.List[typing.Union[ - None, # Empty tuple, all wildcard - typing.Tuple[str, int], # all wildcard with urgency set - typing.Tuple[int, int], # Endpoint, - # Wildcard endpoint, Cluster id present - typing.Tuple[typing.Type[ClusterObjects.Cluster], int], - # Wildcard endpoint, Cluster + Event present - typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int], - # Wildcard event id - typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int], - # Concrete path - typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int] - ]], eventNumberFilter: typing.Optional[int] = None, - fabricFiltered: bool = True, - reportInterval: typing.Optional[typing.Tuple[int, int]] = None, - keepSubscriptions: bool = False, - autoResubscribe: bool = True, - payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD): + async def ReadEvent( + self, + nodeid: int, + events: typing.List[typing.Union[ + None, # Empty tuple, all wildcard + typing.Tuple[str, int], # all wildcard with urgency set + typing.Tuple[int, int], # Endpoint, + # Wildcard endpoint, Cluster id present + typing.Tuple[typing.Type[ClusterObjects.Cluster], int], + # Wildcard endpoint, Cluster + Event present + typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int], + # Wildcard event id + typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int], + # Concrete path + typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int] + ]], eventNumberFilter: typing.Optional[int] = None, + fabricFiltered: bool = True, + reportInterval: typing.Optional[typing.Tuple[int, int]] = None, + keepSubscriptions: bool = False, + autoResubscribe: bool = True, + payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD + ): ''' Read a list of events from a target node, this is a wrapper of DeviceController.Read() diff --git a/src/controller/python/chip/clusters/Attribute.py b/src/controller/python/chip/clusters/Attribute.py index 84124814238969..4d5bc1d17ae1cc 100644 --- a/src/controller/python/chip/clusters/Attribute.py +++ b/src/controller/python/chip/clusters/Attribute.py @@ -314,14 +314,17 @@ class AttributeCache: returnClusterObject: bool = False attributeTLVCache: Dict[int, Dict[int, Dict[int, bytes]]] = field( default_factory=lambda: {}) - attributeCache: Dict[int, List[Cluster]] = field( - default_factory=lambda: {}) versionList: Dict[int, Dict[int, Dict[int, int]]] = field( default_factory=lambda: {}) + _attributeCacheUpdateNeeded: set[AttributePath] = field( + default_factory=lambda: set()) + _attributeCache: Dict[int, List[Cluster]] = field( + default_factory=lambda: {}) + def UpdateTLV(self, path: AttributePath, dataVersion: int, data: Union[bytes, ValueDecodeFailure]): ''' Store data in TLV since that makes it easiest to eventually convert to either the - cluster or attribute view representations (see below in UpdateCachedData). + cluster or attribute view representations (see below in GetUpdatedAttributeCache()). ''' if (path.EndpointId not in self.attributeTLVCache): self.attributeTLVCache[path.EndpointId] = {} @@ -344,7 +347,10 @@ def UpdateTLV(self, path: AttributePath, dataVersion: int, data: Union[bytes, V clusterCache[path.AttributeId] = data - def UpdateCachedData(self, changedPathSet: set[AttributePath]): + # For this path the attribute cache still requires an update. + self._attributeCacheUpdateNeeded.add(path) + + def GetUpdatedAttributeCache(self) -> Dict[int, List[Cluster]]: ''' This converts the raw TLV data into a cluster object format. Two formats are available: @@ -381,12 +387,12 @@ def handle_attribute_view(endpointId, clusterId, attributeId, attributeType): except Exception as ex: return ValueDecodeFailure(value, ex) - for attributePath in changedPathSet: + for attributePath in self._attributeCacheUpdateNeeded: endpointId, clusterId, attributeId = attributePath.EndpointId, attributePath.ClusterId, attributePath.AttributeId - if endpointId not in self.attributeCache: - self.attributeCache[endpointId] = {} - endpointCache = self.attributeCache[endpointId] + if endpointId not in self._attributeCache: + self._attributeCache[endpointId] = {} + endpointCache = self._attributeCache[endpointId] if clusterId not in _ClusterIndex: # @@ -414,6 +420,8 @@ def handle_attribute_view(endpointId, clusterId, attributeId, attributeType): attributeType = _AttributeIndex[(clusterId, attributeId)][0] clusterCache[attributeType] = handle_attribute_view(endpointId, clusterId, attributeId, attributeType) + self._attributeCacheUpdateNeeded.clear() + return self._attributeCache class SubscriptionTransaction: @@ -434,12 +442,12 @@ def __init__(self, transaction: AsyncReadTransaction, subscriptionId, devCtrl): def GetAttributes(self): ''' Returns the attribute value cache tracking the latest state on the publisher. ''' - return self._readTransaction._cache.attributeCache + return self._readTransaction._cache.GetUpdatedAttributeCache() def GetAttribute(self, path: TypedAttributePath) -> Any: ''' Returns a specific attribute given a TypedAttributePath. ''' - data = self._readTransaction._cache.attributeCache + data = self._readTransaction._cache.GetUpdatedAttributeCache() if (self._readTransaction._cache.returnClusterObject): return eval(f'data[path.Path.EndpointId][path.ClusterType].{path.AttributeName}') @@ -650,6 +658,18 @@ def SetClientObjPointers(self, pReadClient, pReadCallback): def GetAllEventValues(self): return self._events + def GetReadResponse(self) -> AsyncReadTransaction.ReadResponse: + """Prepares and returns the ReadResponse object.""" + return self.ReadResponse( + attributes=self._cache.GetUpdatedAttributeCache(), + events=self._events, + tlvAttributes=self._cache.attributeTLVCache + ) + + def GetSubscriptionHandler(self) -> SubscriptionTransaction | None: + """Returns subscription transaction.""" + return self._subscription_handler + def handleAttributeData(self, path: AttributePath, dataVersion: int, status: int, data: bytes): try: imStatus = chip.interaction_model.Status(status) @@ -716,7 +736,7 @@ def _handleSubscriptionEstablished(self, subscriptionId): if not self._future.done(): self._subscription_handler = SubscriptionTransaction( self, subscriptionId, self._devCtrl) - self._future.set_result(self._subscription_handler) + self._future.set_result(self) else: self._subscription_handler._subscriptionId = subscriptionId if self._subscription_handler._onResubscriptionSucceededCb is not None: @@ -745,8 +765,6 @@ def _handleReportBegin(self): pass def _handleReportEnd(self): - self._cache.UpdateCachedData(self._changedPathSet) - if (self._subscription_handler is not None): for change in self._changedPathSet: try: @@ -772,8 +790,7 @@ def _handleDone(self): if self._resultError is not None: self._future.set_exception(self._resultError.to_exception()) else: - self._future.set_result(AsyncReadTransaction.ReadResponse( - attributes=self._cache.attributeCache, events=self._events, tlvAttributes=self._cache.attributeTLVCache)) + self._future.set_result(self) # # Decrement the ref on ourselves to match the increment that happened at allocation. @@ -1001,9 +1018,9 @@ def WriteGroupAttributes(groupId: int, devCtrl: c_void_p, attributes: List[Attri ) -def Read(future: Future, eventLoop, device, devCtrl, +def Read(transaction: AsyncReadTransaction, device, attributes: Optional[List[AttributePath]] = None, dataVersionFilters: Optional[List[DataVersionFilter]] = None, - events: Optional[List[EventPath]] = None, eventNumberFilter: Optional[int] = None, returnClusterObject: bool = True, + events: Optional[List[EventPath]] = None, eventNumberFilter: Optional[int] = None, subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True) -> PyChipError: if (not attributes) and dataVersionFilters: @@ -1011,8 +1028,6 @@ def Read(future: Future, eventLoop, device, devCtrl, "Must provide valid attribute list when data version filters is not null") handle = chip.native.GetLibraryHandle() - transaction = AsyncReadTransaction( - future, eventLoop, devCtrl, returnClusterObject) attributePathsForCffi = None if attributes is not None: @@ -1119,25 +1134,6 @@ def Read(future: Future, eventLoop, device, devCtrl, return res -def ReadAttributes(future: Future, eventLoop, device, devCtrl, - attributes: List[AttributePath], dataVersionFilters: Optional[List[DataVersionFilter]] = None, - returnClusterObject: bool = True, - subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int: - return Read(future=future, eventLoop=eventLoop, device=device, - devCtrl=devCtrl, attributes=attributes, dataVersionFilters=dataVersionFilters, - events=None, returnClusterObject=returnClusterObject, - subscriptionParameters=subscriptionParameters, fabricFiltered=fabricFiltered) - - -def ReadEvents(future: Future, eventLoop, device, devCtrl, - events: List[EventPath], eventNumberFilter=None, returnClusterObject: bool = True, - subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int: - return Read(future=future, eventLoop=eventLoop, device=device, devCtrl=devCtrl, attributes=None, - dataVersionFilters=None, events=events, eventNumberFilter=eventNumberFilter, - returnClusterObject=returnClusterObject, - subscriptionParameters=subscriptionParameters, fabricFiltered=fabricFiltered) - - def Init(): handle = chip.native.GetLibraryHandle() diff --git a/src/controller/python/test/test_scripts/cluster_objects.py b/src/controller/python/test/test_scripts/cluster_objects.py index 10c2ad2e268c30..e18d11905eb5d5 100644 --- a/src/controller/python/test/test_scripts/cluster_objects.py +++ b/src/controller/python/test/test_scripts/cluster_objects.py @@ -216,12 +216,12 @@ def subUpdate(path: TypedAttributePath, transaction: SubscriptionTransaction): sub.SetAttributeUpdateCallback(subUpdate) try: - data = sub.GetAttributes() req = Clusters.OnOff.Commands.On() await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=req) await asyncio.wait_for(event.wait(), timeout=11) + data = sub.GetAttributes() if (data[1][Clusters.OnOff][Clusters.OnOff.Attributes.OnOff] != 1): raise ValueError("Current On/Off state should be 1") @@ -232,6 +232,7 @@ def subUpdate(path: TypedAttributePath, transaction: SubscriptionTransaction): await asyncio.wait_for(event.wait(), timeout=11) + data = sub.GetAttributes() if (data[1][Clusters.OnOff][Clusters.OnOff.Attributes.OnOff] != 0): raise ValueError("Current On/Off state should be 0") @@ -254,13 +255,12 @@ def subUpdate(path: TypedAttributePath, transaction: SubscriptionTransaction): sub.SetAttributeUpdateCallback(subUpdate) try: - data = sub.GetAttributes() - req = Clusters.OnOff.Commands.On() await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=req) await asyncio.wait_for(event.wait(), timeout=11) + data = sub.GetAttributes() cluster: Clusters.OnOff = data[1][Clusters.OnOff] if (not cluster.onOff): raise ValueError("Current On/Off state should be True") @@ -272,6 +272,7 @@ def subUpdate(path: TypedAttributePath, transaction: SubscriptionTransaction): await asyncio.wait_for(event.wait(), timeout=11) + data = sub.GetAttributes() cluster: Clusters.OnOff = data[1][Clusters.OnOff] if (cluster.onOff): raise ValueError("Current On/Off state should be False") @@ -298,7 +299,6 @@ async def TestSubscribeZeroMinInterval(cls, devCtrl): logger.info("Test Subscription With MinInterval of 0") sub = await devCtrl.ReadAttribute(nodeid=NODE_ID, attributes=[Clusters.OnOff, Clusters.LevelControl], reportInterval=(0, 60)) - data = sub.GetAttributes() logger.info("Sending off command") @@ -315,6 +315,7 @@ async def TestSubscribeZeroMinInterval(cls, devCtrl): logger.info("Checking read back value is indeed 254") + data = sub.GetAttributes() if (data[1][Clusters.LevelControl][Clusters.LevelControl.Attributes.CurrentLevel] != 254): raise ValueError("Current Level should have been 254")