Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] Process attribute cache updates in Python thread #35557

Merged
merged 3 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 75 additions & 57 deletions src/controller/python/chip/ChipDeviceCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1433,20 +1433,23 @@ def _parseEventPathTuple(self, pathTuple: typing.Union[
else:
raise ValueError("Unsupported Attribute Path")

async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[int], # Endpoint
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
# Wildcard endpoint, Cluster + Attribute present
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Wildcard attribute id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Directly specified attribute path
ClusterAttribute.AttributePath
]]] = None,
async def Read(
self,
nodeid: int,
attributes: typing.Optional[typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[int], # Endpoint
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
# Wildcard endpoint, Cluster + Attribute present
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Wildcard attribute id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Directly specified attribute path
ClusterAttribute.AttributePath
]]] = None,
dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None, events: typing.Optional[typing.List[
typing.Union[
None, # Empty tuple, all wildcard
Expand All @@ -1461,10 +1464,11 @@ async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
]]] = None,
eventNumberFilter: typing.Optional[int] = None,
returnClusterObject: bool = False, reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
eventNumberFilter: typing.Optional[int] = None,
returnClusterObject: bool = False, reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
):
'''
Read a list of attributes and/or events from a target node

Expand Down Expand Up @@ -1534,33 +1538,43 @@ async def Read(self, nodeid: int, attributes: typing.Optional[typing.List[typing
eventPaths = [self._parseEventPathTuple(
v) for v in events] if events else None

ClusterAttribute.Read(future=future, eventLoop=eventLoop, device=device.deviceProxy, devCtrl=self,
transaction = ClusterAttribute.AsyncReadTransaction(future, eventLoop, self, returnClusterObject)
ClusterAttribute.Read(transaction, device=device.deviceProxy,
attributes=attributePaths, dataVersionFilters=clusterDataVersionFilters, events=eventPaths,
eventNumberFilter=eventNumberFilter, returnClusterObject=returnClusterObject,
eventNumberFilter=eventNumberFilter,
subscriptionParameters=ClusterAttribute.SubscriptionParameters(
reportInterval[0], reportInterval[1]) if reportInterval else None,
fabricFiltered=fabricFiltered,
keepSubscriptions=keepSubscriptions, autoResubscribe=autoResubscribe).raise_on_error()
return await future
await future

async def ReadAttribute(self, nodeid: int, attributes: typing.Optional[typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[int], # Endpoint
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
# Wildcard endpoint, Cluster + Attribute present
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Wildcard attribute id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Directly specified attribute path
ClusterAttribute.AttributePath
]]], dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None,
returnClusterObject: bool = False,
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
if result := transaction.GetSubscriptionHandler():
return result
else:
return transaction.GetReadResponse()

async def ReadAttribute(
self,
nodeid: int,
attributes: typing.Optional[typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[int], # Endpoint
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster]],
# Wildcard endpoint, Cluster + Attribute present
typing.Tuple[typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Wildcard attribute id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster]],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterAttributeDescriptor]],
# Directly specified attribute path
ClusterAttribute.AttributePath
]]], dataVersionFilters: typing.Optional[typing.List[typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int]]] = None,
returnClusterObject: bool = False,
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
):
'''
Read a list of attributes from a target node, this is a wrapper of DeviceController.Read()

Expand Down Expand Up @@ -1629,24 +1643,28 @@ async def ReadAttribute(self, nodeid: int, attributes: typing.Optional[typing.Li
else:
return res.attributes

async def ReadEvent(self, nodeid: int, events: typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[str, int], # all wildcard with urgency set
typing.Tuple[int, int], # Endpoint,
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster], int],
# Wildcard endpoint, Cluster + Event present
typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int],
# Wildcard event id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
]], eventNumberFilter: typing.Optional[int] = None,
fabricFiltered: bool = True,
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
keepSubscriptions: bool = False,
autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD):
async def ReadEvent(
self,
nodeid: int,
events: typing.List[typing.Union[
None, # Empty tuple, all wildcard
typing.Tuple[str, int], # all wildcard with urgency set
typing.Tuple[int, int], # Endpoint,
# Wildcard endpoint, Cluster id present
typing.Tuple[typing.Type[ClusterObjects.Cluster], int],
# Wildcard endpoint, Cluster + Event present
typing.Tuple[typing.Type[ClusterObjects.ClusterEvent], int],
# Wildcard event id
typing.Tuple[int, typing.Type[ClusterObjects.Cluster], int],
# Concrete path
typing.Tuple[int, typing.Type[ClusterObjects.ClusterEvent], int]
]], eventNumberFilter: typing.Optional[int] = None,
fabricFiltered: bool = True,
reportInterval: typing.Optional[typing.Tuple[int, int]] = None,
keepSubscriptions: bool = False,
autoResubscribe: bool = True,
payloadCapability: int = TransportPayloadCapability.MRP_PAYLOAD
):
'''
Read a list of events from a target node, this is a wrapper of DeviceController.Read()

Expand Down
72 changes: 34 additions & 38 deletions src/controller/python/chip/clusters/Attribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,14 +314,17 @@ class AttributeCache:
returnClusterObject: bool = False
attributeTLVCache: Dict[int, Dict[int, Dict[int, bytes]]] = field(
default_factory=lambda: {})
attributeCache: Dict[int, List[Cluster]] = field(
default_factory=lambda: {})
versionList: Dict[int, Dict[int, Dict[int, int]]] = field(
default_factory=lambda: {})

_attributeCacheUpdateNeeded: set[AttributePath] = field(
default_factory=lambda: set())
_attributeCache: Dict[int, List[Cluster]] = field(
default_factory=lambda: {})

def UpdateTLV(self, path: AttributePath, dataVersion: int, data: Union[bytes, ValueDecodeFailure]):
''' Store data in TLV since that makes it easiest to eventually convert to either the
cluster or attribute view representations (see below in UpdateCachedData).
cluster or attribute view representations (see below in GetUpdatedAttributeCache()).
'''
if (path.EndpointId not in self.attributeTLVCache):
self.attributeTLVCache[path.EndpointId] = {}
Expand All @@ -344,7 +347,10 @@ def UpdateTLV(self, path: AttributePath, dataVersion: int, data: Union[bytes, V

clusterCache[path.AttributeId] = data

def UpdateCachedData(self, changedPathSet: set[AttributePath]):
# For this path the attribute cache still requires an update.
self._attributeCacheUpdateNeeded.add(path)

def GetUpdatedAttributeCache(self) -> Dict[int, List[Cluster]]:
''' This converts the raw TLV data into a cluster object format.

Two formats are available:
Expand Down Expand Up @@ -381,12 +387,12 @@ def handle_attribute_view(endpointId, clusterId, attributeId, attributeType):
except Exception as ex:
return ValueDecodeFailure(value, ex)

for attributePath in changedPathSet:
for attributePath in self._attributeCacheUpdateNeeded:
endpointId, clusterId, attributeId = attributePath.EndpointId, attributePath.ClusterId, attributePath.AttributeId

if endpointId not in self.attributeCache:
self.attributeCache[endpointId] = {}
endpointCache = self.attributeCache[endpointId]
if endpointId not in self._attributeCache:
self._attributeCache[endpointId] = {}
endpointCache = self._attributeCache[endpointId]

if clusterId not in _ClusterIndex:
#
Expand Down Expand Up @@ -414,6 +420,8 @@ def handle_attribute_view(endpointId, clusterId, attributeId, attributeType):

attributeType = _AttributeIndex[(clusterId, attributeId)][0]
clusterCache[attributeType] = handle_attribute_view(endpointId, clusterId, attributeId, attributeType)
self._attributeCacheUpdateNeeded.clear()
return self._attributeCache


class SubscriptionTransaction:
Expand All @@ -434,12 +442,12 @@ def __init__(self, transaction: AsyncReadTransaction, subscriptionId, devCtrl):
def GetAttributes(self):
''' Returns the attribute value cache tracking the latest state on the publisher.
'''
return self._readTransaction._cache.attributeCache
return self._readTransaction._cache.GetUpdatedAttributeCache()

def GetAttribute(self, path: TypedAttributePath) -> Any:
''' Returns a specific attribute given a TypedAttributePath.
'''
data = self._readTransaction._cache.attributeCache
data = self._readTransaction._cache.GetUpdatedAttributeCache()

if (self._readTransaction._cache.returnClusterObject):
return eval(f'data[path.Path.EndpointId][path.ClusterType].{path.AttributeName}')
Expand Down Expand Up @@ -650,6 +658,18 @@ def SetClientObjPointers(self, pReadClient, pReadCallback):
def GetAllEventValues(self):
return self._events

def GetReadResponse(self) -> AsyncReadTransaction.ReadResponse:
"""Prepares and returns the ReadResponse object."""
return self.ReadResponse(
attributes=self._cache.GetUpdatedAttributeCache(),
events=self._events,
tlvAttributes=self._cache.attributeTLVCache
)

def GetSubscriptionHandler(self) -> SubscriptionTransaction | None:
"""Returns subscription transaction."""
return self._subscription_handler

def handleAttributeData(self, path: AttributePath, dataVersion: int, status: int, data: bytes):
try:
imStatus = chip.interaction_model.Status(status)
Expand Down Expand Up @@ -716,7 +736,7 @@ def _handleSubscriptionEstablished(self, subscriptionId):
if not self._future.done():
self._subscription_handler = SubscriptionTransaction(
self, subscriptionId, self._devCtrl)
self._future.set_result(self._subscription_handler)
self._future.set_result(self)
else:
self._subscription_handler._subscriptionId = subscriptionId
if self._subscription_handler._onResubscriptionSucceededCb is not None:
Expand Down Expand Up @@ -745,8 +765,6 @@ def _handleReportBegin(self):
pass

def _handleReportEnd(self):
self._cache.UpdateCachedData(self._changedPathSet)

if (self._subscription_handler is not None):
for change in self._changedPathSet:
try:
Expand All @@ -772,8 +790,7 @@ def _handleDone(self):
if self._resultError is not None:
self._future.set_exception(self._resultError.to_exception())
else:
self._future.set_result(AsyncReadTransaction.ReadResponse(
attributes=self._cache.attributeCache, events=self._events, tlvAttributes=self._cache.attributeTLVCache))
self._future.set_result(self)

#
# Decrement the ref on ourselves to match the increment that happened at allocation.
Expand Down Expand Up @@ -1001,18 +1018,16 @@ def WriteGroupAttributes(groupId: int, devCtrl: c_void_p, attributes: List[Attri
)


def Read(future: Future, eventLoop, device, devCtrl,
def Read(transaction: AsyncReadTransaction, device,
attributes: Optional[List[AttributePath]] = None, dataVersionFilters: Optional[List[DataVersionFilter]] = None,
events: Optional[List[EventPath]] = None, eventNumberFilter: Optional[int] = None, returnClusterObject: bool = True,
events: Optional[List[EventPath]] = None, eventNumberFilter: Optional[int] = None,
subscriptionParameters: Optional[SubscriptionParameters] = None,
fabricFiltered: bool = True, keepSubscriptions: bool = False, autoResubscribe: bool = True) -> PyChipError:
if (not attributes) and dataVersionFilters:
raise ValueError(
"Must provide valid attribute list when data version filters is not null")

handle = chip.native.GetLibraryHandle()
transaction = AsyncReadTransaction(
future, eventLoop, devCtrl, returnClusterObject)

attributePathsForCffi = None
if attributes is not None:
Expand Down Expand Up @@ -1119,25 +1134,6 @@ def Read(future: Future, eventLoop, device, devCtrl,
return res


def ReadAttributes(future: Future, eventLoop, device, devCtrl,
arkq marked this conversation as resolved.
Show resolved Hide resolved
attributes: List[AttributePath], dataVersionFilters: Optional[List[DataVersionFilter]] = None,
returnClusterObject: bool = True,
subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int:
return Read(future=future, eventLoop=eventLoop, device=device,
devCtrl=devCtrl, attributes=attributes, dataVersionFilters=dataVersionFilters,
events=None, returnClusterObject=returnClusterObject,
subscriptionParameters=subscriptionParameters, fabricFiltered=fabricFiltered)


def ReadEvents(future: Future, eventLoop, device, devCtrl,
events: List[EventPath], eventNumberFilter=None, returnClusterObject: bool = True,
subscriptionParameters: Optional[SubscriptionParameters] = None, fabricFiltered: bool = True) -> int:
return Read(future=future, eventLoop=eventLoop, device=device, devCtrl=devCtrl, attributes=None,
dataVersionFilters=None, events=events, eventNumberFilter=eventNumberFilter,
returnClusterObject=returnClusterObject,
subscriptionParameters=subscriptionParameters, fabricFiltered=fabricFiltered)


def Init():
handle = chip.native.GetLibraryHandle()

Expand Down
9 changes: 5 additions & 4 deletions src/controller/python/test/test_scripts/cluster_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,12 @@ def subUpdate(path: TypedAttributePath, transaction: SubscriptionTransaction):
sub.SetAttributeUpdateCallback(subUpdate)

try:
data = sub.GetAttributes()
req = Clusters.OnOff.Commands.On()
await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=req)

await asyncio.wait_for(event.wait(), timeout=11)

data = sub.GetAttributes()
if (data[1][Clusters.OnOff][Clusters.OnOff.Attributes.OnOff] != 1):
raise ValueError("Current On/Off state should be 1")

Expand All @@ -232,6 +232,7 @@ def subUpdate(path: TypedAttributePath, transaction: SubscriptionTransaction):

await asyncio.wait_for(event.wait(), timeout=11)

data = sub.GetAttributes()
if (data[1][Clusters.OnOff][Clusters.OnOff.Attributes.OnOff] != 0):
raise ValueError("Current On/Off state should be 0")

Expand All @@ -254,13 +255,12 @@ def subUpdate(path: TypedAttributePath, transaction: SubscriptionTransaction):
sub.SetAttributeUpdateCallback(subUpdate)

try:
data = sub.GetAttributes()

req = Clusters.OnOff.Commands.On()
await devCtrl.SendCommand(nodeid=NODE_ID, endpoint=1, payload=req)

await asyncio.wait_for(event.wait(), timeout=11)

data = sub.GetAttributes()
cluster: Clusters.OnOff = data[1][Clusters.OnOff]
if (not cluster.onOff):
raise ValueError("Current On/Off state should be True")
Expand All @@ -272,6 +272,7 @@ def subUpdate(path: TypedAttributePath, transaction: SubscriptionTransaction):

await asyncio.wait_for(event.wait(), timeout=11)

data = sub.GetAttributes()
cluster: Clusters.OnOff = data[1][Clusters.OnOff]
if (cluster.onOff):
raise ValueError("Current On/Off state should be False")
Expand All @@ -298,7 +299,6 @@ async def TestSubscribeZeroMinInterval(cls, devCtrl):
logger.info("Test Subscription With MinInterval of 0")
sub = await devCtrl.ReadAttribute(nodeid=NODE_ID,
attributes=[Clusters.OnOff, Clusters.LevelControl], reportInterval=(0, 60))
data = sub.GetAttributes()

logger.info("Sending off command")

Expand All @@ -315,6 +315,7 @@ async def TestSubscribeZeroMinInterval(cls, devCtrl):

logger.info("Checking read back value is indeed 254")

data = sub.GetAttributes()
if (data[1][Clusters.LevelControl][Clusters.LevelControl.Attributes.CurrentLevel] != 254):
raise ValueError("Current Level should have been 254")

Expand Down
Loading