From 447e79a53b268a879e69bdc962087d16efc1bb3e Mon Sep 17 00:00:00 2001 From: Stefan Agner Date: Thu, 12 Sep 2024 18:54:46 +0200 Subject: [PATCH] [Python] Process attribute cache updates in Python thread Instead of processing the attribute update in the SDK thread, process them on request in the Python thread. This avoids acks being sent back too late to the device after the last DataReport if there are many attribute updates sent at once. Currently still the same data model and processing is done. There is certainly also room for optimization to make this more efficient. --- src/controller/python/chip/ChipDeviceCtrl.py | 130 ++++++++++-------- .../python/chip/clusters/Attribute.py | 66 ++++----- 2 files changed, 102 insertions(+), 94 deletions(-) diff --git a/src/controller/python/chip/ChipDeviceCtrl.py b/src/controller/python/chip/ChipDeviceCtrl.py index d76d415d746945..a91eb5f03c0bbd 100644 --- a/src/controller/python/chip/ChipDeviceCtrl.py +++ b/src/controller/python/chip/ChipDeviceCtrl.py @@ -1433,20 +1433,23 @@ def _parseEventPathTuple(self, pathTuple: typing.Union[ else: raise ValueError("Unsupported Attribute Path") - async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing.Union[ - None, # Empty tuple, all wildcard - typing.Tuple[int], # Endpoint - # Wildcard endpoint, Cluster id present - typing.Tuple[typing.Type[ClusterObjects.Cluster]], - # Wildcard endpoint, Cluster + Attribute present - typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]], - # Wildcard attribute id - typing.Tuple[int, typing.Type[ClusterObjects.Cluster]], - # Concrete path - typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]], - # Directly specified attribute path - ClusterAttribute.AttributePath - ]]] = None, + async def Read( + self, + nodeid: int, + attributes: typing.Optional[typing.List[typing.Union[ + None, # Empty tuple, all wildcard + typing.Tuple[int], # Endpoint + # Wildcard endpoint, Cluster id present + typing.Tuple[typing.Type[ClusterObjects.Cluster]], + # Wildcard endpoint, Cluster + Attribute present + typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]], + # Wildcard attribute id + typing.Tuple[int, typing.Type[ClusterObjects.Cluster]], + # Concrete path + typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]], + # Directly specified attribute path + ClusterAttribute.AttributePath + ]]] = None, dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None, events: typing.Optional[typing.List[ typing.Union[ None, # Empty tuple, all wildcard @@ -1461,10 +1464,11 @@ async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing # Concrete path typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int] ]]] = None, - eventNumberFilter: typing.Optional[int] = None, - returnClusterObject: bool = False, reportInterval: typing.Optional[typing.Tuple[int, int]] = None, - fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True, - payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD): + eventNumberFilter: typing.Optional[int] = None, + returnClusterObject: bool = False, reportInterval: typing.Optional[typing.Tuple[int, int]] = None, + fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True, + payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD + ): ''' Read a list of attributes and/or events from a target node @@ -1534,33 +1538,41 @@ async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing eventPaths = [self._parseEventPathTuple( v) for v in events] if events else None - ClusterAttribute.Read(future=future, eventLoop=eventLoop, device=device.deviceProxy, devCtrl=self, + + transaction = ClusterAttribute.AsyncReadTransaction(future, eventLoop, self, returnClusterObject) + ClusterAttribute.Read(transaction, attributes=attributePaths, dataVersionFilters=clusterDataVersionFilters, events=eventPaths, - eventNumberFilter=eventNumberFilter, returnClusterObject=returnClusterObject, + eventNumberFilter=eventNumberFilter, subscriptionParameters=ClusterAttribute.SubscriptionParameters( reportInterval[0], reportInterval[1]) if reportInterval else None, fabricFiltered=fabricFiltered, keepSubscriptions=keepSubscriptions, autoResubscribe=autoResubscribe).raise_on_error() - return await future + await future - async def ReadAttribute(self, nodeid: int, attributes: typing.Optional[typing.List[typing.Union[ - None, # Empty tuple, all wildcard - typing.Tuple[int], # Endpoint - # Wildcard endpoint, Cluster id present - typing.Tuple[typing.Type[ClusterObjects.Cluster]], - # Wildcard endpoint, Cluster + Attribute present - typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]], - # Wildcard attribute id - typing.Tuple[int, typing.Type[ClusterObjects.Cluster]], - # Concrete path - typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]], - # Directly specified attribute path - ClusterAttribute.AttributePath - ]]], dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None, - returnClusterObject: bool = False, - reportInterval: typing.Optional[typing.Tuple[int, int]] = None, - fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True, - payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD): + return transaction.GetReadResponse() + + async def ReadAttribute( + self, + nodeid: int, + attributes: typing.Optional[typing.List[typing.Union[ + None, # Empty tuple, all wildcard + typing.Tuple[int], # Endpoint + # Wildcard endpoint, Cluster id present + typing.Tuple[typing.Type[ClusterObjects.Cluster]], + # Wildcard endpoint, Cluster + Attribute present + typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]], + # Wildcard attribute id + typing.Tuple[int, typing.Type[ClusterObjects.Cluster]], + # Concrete path + typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]], + # Directly specified attribute path + ClusterAttribute.AttributePath + ]]], dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None, + returnClusterObject: bool = False, + reportInterval: typing.Optional[typing.Tuple[int, int]] = None, + fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True, + payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD + ): ''' Read a list of attributes from a target node, this is a wrapper of DeviceController.Read() @@ -1629,24 +1641,28 @@ async def ReadAttribute(self, nodeid: int, attributes: typing.Optional[typing.Li else: return res.attributes - async def ReadEvent(self, nodeid: int, events: typing.List[typing.Union[ - None, # Empty tuple, all wildcard - typing.Tuple[str, int], # all wildcard with urgency set - typing.Tuple[int, int], # Endpoint, - # Wildcard endpoint, Cluster id present - typing.Tuple[typing.Type[ClusterObjects.Cluster], int], - # Wildcard endpoint, Cluster + Event present - typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int], - # Wildcard event id - typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int], - # Concrete path - typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int] - ]], eventNumberFilter: typing.Optional[int] = None, - fabricFiltered: bool = True, - reportInterval: typing.Optional[typing.Tuple[int, int]] = None, - keepSubscriptions: bool = False, - autoResubscribe: bool = True, - payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD): + async def ReadEvent( + self, + nodeid: int, + events: typing.List[typing.Union[ + None, # Empty tuple, all wildcard + typing.Tuple[str, int], # all wildcard with urgency set + typing.Tuple[int, int], # Endpoint, + # Wildcard endpoint, Cluster id present + typing.Tuple[typing.Type[ClusterObjects.Cluster], int], + # Wildcard endpoint, Cluster + Event present + typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int], + # Wildcard event id + typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int], + # Concrete path + typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int] + ]], eventNumberFilter: typing.Optional[int] = None, + fabricFiltered: bool = True, + reportInterval: typing.Optional[typing.Tuple[int, int]] = None, + keepSubscriptions: bool = False, + autoResubscribe: bool = True, + payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD + ): ''' Read a list of events from a target node, this is a wrapper of DeviceController.Read() diff --git a/src/controller/python/chip/clusters/Attribute.py b/src/controller/python/chip/clusters/Attribute.py index 84124814238969..2aaf0b428cf775 100644 --- a/src/controller/python/chip/clusters/Attribute.py +++ b/src/controller/python/chip/clusters/Attribute.py @@ -314,14 +314,17 @@ class AttributeCache: returnClusterObject: bool = False attributeTLVCache: Dict[int, Dict[int, Dict[int, bytes]]] = field( default_factory=lambda: {}) - attributeCache: Dict[int, List[Cluster]] = field( - default_factory=lambda: {}) versionList: Dict[int, Dict[int, Dict[int, int]]] = field( default_factory=lambda: {}) + _attributeCacheUpdateNeeded: set[AttributePath] = field( + default_factory=lambda: set()) + _attributeCache: Dict[int, List[Cluster]] = field( + default_factory=lambda: {}) + def UpdateTLV(self, path: AttributePath, dataVersion: int, data: Union[bytes, ValueDecodeFailure]): ''' Store data in TLV since that makes it easiest to eventually convert to either the - cluster or attribute view representations (see below in UpdateCachedData). + cluster or attribute view representations (see below in GetUpdatedAttributeCache()). ''' if (path.EndpointId not in self.attributeTLVCache): self.attributeTLVCache[path.EndpointId] = {} @@ -344,7 +347,10 @@ def UpdateTLV(self, path: AttributePath, dataVersion: int, data: Union[bytes, V clusterCache[path.AttributeId] = data - def UpdateCachedData(self, changedPathSet: set[AttributePath]): + # For this path the attribute cache still requires an update. + self._attributeCacheUpdateNeeded.add(path) + + def GetUpdatedAttributeCache(self) -> Dict[int, List[Cluster]]: ''' This converts the raw TLV data into a cluster object format. Two formats are available: @@ -381,12 +387,12 @@ def handle_attribute_view(endpointId, clusterId, attributeId, attributeType): except Exception as ex: return ValueDecodeFailure(value, ex) - for attributePath in changedPathSet: + for attributePath in self._attributeCacheUpdateNeeded: endpointId, clusterId, attributeId = attributePath.EndpointId, attributePath.ClusterId, attributePath.AttributeId - if endpointId not in self.attributeCache: - self.attributeCache[endpointId] = {} - endpointCache = self.attributeCache[endpointId] + if endpointId not in self._attributeCache: + self._attributeCache[endpointId] = {} + endpointCache = self._attributeCache[endpointId] if clusterId not in _ClusterIndex: # @@ -414,6 +420,8 @@ def handle_attribute_view(endpointId, clusterId, attributeId, attributeType): attributeType = _AttributeIndex[(clusterId, attributeId)][0] clusterCache[attributeType] = handle_attribute_view(endpointId, clusterId, attributeId, attributeType) + self._attributeCacheUpdateNeeded.clear() + return self._attributeCache class SubscriptionTransaction: @@ -434,12 +442,12 @@ def __init__(self, transaction: AsyncReadTransaction, subscriptionId, devCtrl): def GetAttributes(self): ''' Returns the attribute value cache tracking the latest state on the publisher. ''' - return self._readTransaction._cache.attributeCache + return self._readTransaction._cache.GetUpdatedAttributeCache() def GetAttribute(self, path: TypedAttributePath) -> Any: ''' Returns a specific attribute given a TypedAttributePath. ''' - data = self._readTransaction._cache.attributeCache + data = self._readTransaction._cache.GetUpdatedAttributeCache() if (self._readTransaction._cache.returnClusterObject): return eval(f'data[path.Path.EndpointId][path.ClusterType].{path.AttributeName}') @@ -650,6 +658,14 @@ def SetClientObjPointers(self, pReadClient, pReadCallback): def GetAllEventValues(self): return self._events + def GetReadResponse(self) -> AsyncReadTransaction.ReadResponse: + """Prepares and returns the ReadResponse object.""" + return self.ReadResponse( + attributes=self._cache.GetUpdatedAttributeCache(), + events=self._events, + tlvAttributes=self._cache.attributeTLVCache + ) + def handleAttributeData(self, path: AttributePath, dataVersion: int, status: int, data: bytes): try: imStatus = chip.interaction_model.Status(status) @@ -745,8 +761,6 @@ def _handleReportBegin(self): pass def _handleReportEnd(self): - self._cache.UpdateCachedData(self._changedPathSet) - if (self._subscription_handler is not None): for change in self._changedPathSet: try: @@ -772,8 +786,7 @@ def _handleDone(self): if self._resultError is not None: self._future.set_exception(self._resultError.to_exception()) else: - self._future.set_result(AsyncReadTransaction.ReadResponse( - attributes=self._cache.attributeCache, events=self._events, tlvAttributes=self._cache.attributeTLVCache)) + self._future.set_result(self) # # Decrement the ref on ourselves to match the increment that happened at allocation. @@ -1001,9 +1014,9 @@ def WriteGroupAttributes(groupId: int, devCtrl: c_void_p, attributes: List[Attri ) -def Read(future: Future, eventLoop, device, devCtrl, +def Read(transaction: AsyncReadTransaction, attributes: Optional[List[AttributePath]] = None, dataVersionFilters: Optional[List[DataVersionFilter]] = None, - events: Optional[List[EventPath]] = None, eventNumberFilter: Optional[int] = None, returnClusterObject: bool = True, + events: Optional[List[EventPath]] = None, eventNumberFilter: Optional[int] = None, subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True) -> PyChipError: if (not attributes) and dataVersionFilters: @@ -1011,8 +1024,6 @@ def Read(future: Future, eventLoop, device, devCtrl, "Must provide valid attribute list when data version filters is not null") handle = chip.native.GetLibraryHandle() - transaction = AsyncReadTransaction( - future, eventLoop, devCtrl, returnClusterObject) attributePathsForCffi = None if attributes is not None: @@ -1119,25 +1130,6 @@ def Read(future: Future, eventLoop, device, devCtrl, return res -def ReadAttributes(future: Future, eventLoop, device, devCtrl, - attributes: List[AttributePath], dataVersionFilters: Optional[List[DataVersionFilter]] = None, - returnClusterObject: bool = True, - subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int: - return Read(future=future, eventLoop=eventLoop, device=device, - devCtrl=devCtrl, attributes=attributes, dataVersionFilters=dataVersionFilters, - events=None, returnClusterObject=returnClusterObject, - subscriptionParameters=subscriptionParameters, fabricFiltered=fabricFiltered) - - -def ReadEvents(future: Future, eventLoop, device, devCtrl, - events: List[EventPath], eventNumberFilter=None, returnClusterObject: bool = True, - subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int: - return Read(future=future, eventLoop=eventLoop, device=device, devCtrl=devCtrl, attributes=None, - dataVersionFilters=None, events=events, eventNumberFilter=eventNumberFilter, - returnClusterObject=returnClusterObject, - subscriptionParameters=subscriptionParameters, fabricFiltered=fabricFiltered) - - def Init(): handle = chip.native.GetLibraryHandle()