Skip to content

Commit

Permalink
[Python] Process attribute cache updates in Python thread
Browse files Browse the repository at this point in the history
Instead of processing the attribute update in the SDK thread, process
them on request in the Python thread. This avoids acks being sent back
too late to the device  after the last DataReport if there are many
attribute updates sent at once.

Currently still the same data model and processing is done. There is
certainly also room for optimization to make this more efficient.
  • Loading branch information
agners committed Sep 12, 2024
1 parent d04a667 commit d66652c
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 94 deletions.
129 changes: 72 additions & 57 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1433,20 +1433,23 @@ def _parseEventPathTuple(self, pathTuple: typing.Union[
else:
raise ValueError("Unsupported Attribute Path")

async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[int], # Endpoint
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
# Wildcard endpoint, Cluster + Attribute present
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Wildcard attribute id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Directly specified attribute path
ClusterAttribute.AttributePath
]]] = None,
async def Read(
self,
nodeid: int,
attributes: typing.Optional[typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[int], # Endpoint
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
# Wildcard endpoint, Cluster + Attribute present
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Wildcard attribute id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Directly specified attribute path
ClusterAttribute.AttributePath
]]] = None,
dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None, events: typing.Optional[typing.List[
typing.Union[
None, # Empty tuple, all wildcard
Expand All @@ -1461,10 +1464,11 @@ async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
]]] = None,
eventNumberFilter: typing.Optional[int] = None,
returnClusterObject: bool = False, reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
eventNumberFilter: typing.Optional[int] = None,
returnClusterObject: bool = False, reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
):
'''
Read a list of attributes and/or events from a target node
Expand Down Expand Up @@ -1534,33 +1538,40 @@ async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing
eventPaths = [self._parseEventPathTuple(
v) for v in events] if events else None

ClusterAttribute.Read(future=future, eventLoop=eventLoop, device=device.deviceProxy, devCtrl=self,
transaction = ClusterAttribute.AsyncReadTransaction(future, eventLoop, self, returnClusterObject)
ClusterAttribute.Read(transaction,
attributes=attributePaths, dataVersionFilters=clusterDataVersionFilters, events=eventPaths,
eventNumberFilter=eventNumberFilter, returnClusterObject=returnClusterObject,
eventNumberFilter=eventNumberFilter,
subscriptionParameters=ClusterAttribute.SubscriptionParameters(
reportInterval[0], reportInterval[1]) if reportInterval else None,
fabricFiltered=fabricFiltered,
keepSubscriptions=keepSubscriptions, autoResubscribe=autoResubscribe).raise_on_error()
return await future
await future

async def ReadAttribute(self, nodeid: int, attributes: typing.Optional[typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[int], # Endpoint
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
# Wildcard endpoint, Cluster + Attribute present
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Wildcard attribute id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Directly specified attribute path
ClusterAttribute.AttributePath
]]], dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None,
returnClusterObject: bool = False,
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
return transaction.GetReadResponse()

async def ReadAttribute(
self,
nodeid: int,
attributes: typing.Optional[typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[int], # Endpoint
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
# Wildcard endpoint, Cluster + Attribute present
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Wildcard attribute id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Directly specified attribute path
ClusterAttribute.AttributePath
]]], dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None,
returnClusterObject: bool = False,
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
):
'''
Read a list of attributes from a target node, this is a wrapper of DeviceController.Read()
Expand Down Expand Up @@ -1629,24 +1640,28 @@ async def ReadAttribute(self, nodeid: int, attributes: typing.Optional[typing.Li
else:
return res.attributes

async def ReadEvent(self, nodeid: int, events: typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[str, int], # all wildcard with urgency set
typing.Tuple[int, int], # Endpoint,
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster], int],
# Wildcard endpoint, Cluster + Event present
typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int],
# Wildcard event id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
]], eventNumberFilter: typing.Optional[int] = None,
fabricFiltered: bool = True,
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
keepSubscriptions: bool = False,
autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
async def ReadEvent(
self,
nodeid: int,
events: typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[str, int], # all wildcard with urgency set
typing.Tuple[int, int], # Endpoint,
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster], int],
# Wildcard endpoint, Cluster + Event present
typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int],
# Wildcard event id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
]], eventNumberFilter: typing.Optional[int] = None,
fabricFiltered: bool = True,
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
keepSubscriptions: bool = False,
autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
):
'''
Read a list of events from a target node, this is a wrapper of DeviceController.Read()
Expand Down
66 changes: 29 additions & 37 deletions src/controller/python/chip/clusters/Attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,14 +314,17 @@ class AttributeCache:
returnClusterObject: bool = False
attributeTLVCache: Dict[int, Dict[int, Dict[int, bytes]]] = field(
default_factory=lambda: {})
attributeCache: Dict[int, List[Cluster]] = field(
default_factory=lambda: {})
versionList: Dict[int, Dict[int, Dict[int, int]]] = field(
default_factory=lambda: {})

_attributeCacheUpdateNeeded: set[AttributePath] = field(
default_factory=lambda: set())
_attributeCache: Dict[int, List[Cluster]] = field(
default_factory=lambda: {})

def UpdateTLV(self, path: AttributePath, dataVersion: int, data: Union[bytes, ValueDecodeFailure]):
''' Store data in TLV since that makes it easiest to eventually convert to either the
cluster or attribute view representations (see below in UpdateCachedData).
cluster or attribute view representations (see below in GetUpdatedAttributeCache()).
'''
if (path.EndpointId not in self.attributeTLVCache):
self.attributeTLVCache[path.EndpointId] = {}
Expand All @@ -344,7 +347,10 @@ def UpdateTLV(self, path: AttributePath, dataVersion: int, data: Union[bytes, V

clusterCache[path.AttributeId] = data

def UpdateCachedData(self, changedPathSet: set[AttributePath]):
# For this path the attribute cache still requires an update.
self._attributeCacheUpdateNeeded.add(path)

def GetUpdatedAttributeCache(self) -> Dict[int, List[Cluster]]:
''' This converts the raw TLV data into a cluster object format.
Two formats are available:
Expand Down Expand Up @@ -381,12 +387,12 @@ def handle_attribute_view(endpointId, clusterId, attributeId, attributeType):
except Exception as ex:
return ValueDecodeFailure(value, ex)

for attributePath in changedPathSet:
for attributePath in self._attributeCacheUpdateNeeded:
endpointId, clusterId, attributeId = attributePath.EndpointId, attributePath.ClusterId, attributePath.AttributeId

if endpointId not in self.attributeCache:
self.attributeCache[endpointId] = {}
endpointCache = self.attributeCache[endpointId]
if endpointId not in self._attributeCache:
self._attributeCache[endpointId] = {}
endpointCache = self._attributeCache[endpointId]

if clusterId not in _ClusterIndex:
#
Expand Down Expand Up @@ -414,6 +420,8 @@ def handle_attribute_view(endpointId, clusterId, attributeId, attributeType):

attributeType = _AttributeIndex[(clusterId, attributeId)][0]
clusterCache[attributeType] = handle_attribute_view(endpointId, clusterId, attributeId, attributeType)
self._attributeCacheUpdateNeeded.clear()
return self._attributeCache


class SubscriptionTransaction:
Expand All @@ -434,12 +442,12 @@ def __init__(self, transaction: AsyncReadTransaction, subscriptionId, devCtrl):
def GetAttributes(self):
''' Returns the attribute value cache tracking the latest state on the publisher.
'''
return self._readTransaction._cache.attributeCache
return self._readTransaction._cache.GetUpdatedAttributeCache()

def GetAttribute(self, path: TypedAttributePath) -> Any:
''' Returns a specific attribute given a TypedAttributePath.
'''
data = self._readTransaction._cache.attributeCache
data = self._readTransaction._cache.GetUpdatedAttributeCache()

if (self._readTransaction._cache.returnClusterObject):
return eval(f'data[path.Path.EndpointId][path.ClusterType].{path.AttributeName}')
Expand Down Expand Up @@ -650,6 +658,14 @@ def SetClientObjPointers(self, pReadClient, pReadCallback):
def GetAllEventValues(self):
return self._events

def GetReadResponse(self) -> AsyncReadTransaction.ReadResponse:
"""Prepares and returns the ReadResponse object."""
return self.ReadResponse(
attributes=self._cache.GetUpdatedAttributeCache(),
events=self._events,
tlvAttributes=self._cache.attributeTLVCache
)

def handleAttributeData(self, path: AttributePath, dataVersion: int, status: int, data: bytes):
try:
imStatus = chip.interaction_model.Status(status)
Expand Down Expand Up @@ -745,8 +761,6 @@ def _handleReportBegin(self):
pass

def _handleReportEnd(self):
self._cache.UpdateCachedData(self._changedPathSet)

if (self._subscription_handler is not None):
for change in self._changedPathSet:
try:
Expand All @@ -772,8 +786,7 @@ def _handleDone(self):
if self._resultError is not None:
self._future.set_exception(self._resultError.to_exception())
else:
self._future.set_result(AsyncReadTransaction.ReadResponse(
attributes=self._cache.attributeCache, events=self._events, tlvAttributes=self._cache.attributeTLVCache))
self._future.set_result(self)

#
# Decrement the ref on ourselves to match the increment that happened at allocation.
Expand Down Expand Up @@ -1001,18 +1014,16 @@ def WriteGroupAttributes(groupId: int, devCtrl: c_void_p, attributes: List[Attri
)


def Read(future: Future, eventLoop, device, devCtrl,
def Read(transaction: AsyncReadTransaction,
attributes: Optional[List[AttributePath]] = None, dataVersionFilters: Optional[List[DataVersionFilter]] = None,
events: Optional[List[EventPath]] = None, eventNumberFilter: Optional[int] = None, returnClusterObject: bool = True,
events: Optional[List[EventPath]] = None, eventNumberFilter: Optional[int] = None,
subscriptionParameters: Optional[SubscriptionParameters] = None,
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True) -> PyChipError:
if (not attributes) and dataVersionFilters:
raise ValueError(
"Must provide valid attribute list when data version filters is not null")

handle = chip.native.GetLibraryHandle()
transaction = AsyncReadTransaction(
future, eventLoop, devCtrl, returnClusterObject)

attributePathsForCffi = None
if attributes is not None:
Expand Down Expand Up @@ -1119,25 +1130,6 @@ def Read(future: Future, eventLoop, device, devCtrl,
return res


def ReadAttributes(future: Future, eventLoop, device, devCtrl,
attributes: List[AttributePath], dataVersionFilters: Optional[List[DataVersionFilter]] = None,
returnClusterObject: bool = True,
subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int:
return Read(future=future, eventLoop=eventLoop, device=device,
devCtrl=devCtrl, attributes=attributes, dataVersionFilters=dataVersionFilters,
events=None, returnClusterObject=returnClusterObject,
subscriptionParameters=subscriptionParameters, fabricFiltered=fabricFiltered)


def ReadEvents(future: Future, eventLoop, device, devCtrl,
events: List[EventPath], eventNumberFilter=None, returnClusterObject: bool = True,
subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int:
return Read(future=future, eventLoop=eventLoop, device=device, devCtrl=devCtrl, attributes=None,
dataVersionFilters=None, events=events, eventNumberFilter=eventNumberFilter,
returnClusterObject=returnClusterObject,
subscriptionParameters=subscriptionParameters, fabricFiltered=fabricFiltered)


def Init():
handle = chip.native.GetLibraryHandle()

Expand Down

0 comments on commit d66652c

Please sign in to comment.