forked from eclipse-velocitas/vehicle-app-python-sdk
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsubscriptions.py
136 lines (118 loc) · 4.66 KB
/
subscriptions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# Copyright (c) 2022-2024 Contributors to the Eclipse Foundation
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0.
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0
import asyncio
import logging
import grpc
from velocitas_sdk.vdb.reply import DataPointReply
logger = logging.getLogger(__name__)
class SubscriptionManager:
"""Helper for subscription handling."""
_subscription_tasks = {} # type: ignore
@staticmethod
async def remove_all_subscriptions():
for task in SubscriptionManager._subscription_tasks.items():
try:
await SubscriptionManager._remove_subscription(task[0])
except Exception as ex:
logger.exception(ex)
@staticmethod
def list_all_subscription():
queries = []
for task in SubscriptionManager._subscription_tasks.items():
if not (task[1].cancelled() or task[1].done()):
queries.append(task[1].get_name())
return queries
@staticmethod
async def _remove_subscription(vdb_sub):
try:
task = SubscriptionManager._subscription_tasks[vdb_sub]
if not task.cancelled():
task.cancel()
await task
except asyncio.CancelledError:
logger.info(
"Unsubscribed from %s",
task.get_name(),
)
return task
except Exception as ex:
logger.debug("Task status -> %s", task)
logger.exception(ex)
raise
@staticmethod
def _add_subscription(vdb_sub):
try:
task = asyncio.create_task(
SubscriptionManager._subscribe_to_data_points_forever(vdb_sub),
name=vdb_sub.query,
)
SubscriptionManager._subscription_tasks[vdb_sub] = task
logger.info("Subscribing to %s", vdb_sub.query)
return task
except (grpc.aio.AioRpcError, Exception): # type: ignore
logger.exception("Error occured in SubscriptionManager._add_subscription.")
raise
# @retry((grpc.aio.AioRpcError), delay=2)
@staticmethod
async def _subscribe_to_data_points(vdb_sub):
try:
async for reply in vdb_sub.vdb_client.Subscribe(vdb_sub.query):
reply_wrapper = DataPointReply(reply)
if asyncio.iscoroutinefunction(vdb_sub.call_back):
await vdb_sub.call_back(reply_wrapper)
else:
vdb_sub.call_back(reply_wrapper)
except (grpc.aio.AioRpcError, Exception): # type: ignore
logger.exception(
"Error occured in SubscriptionManager.subscribe_to_data_points."
)
raise
@staticmethod
async def _subscribe_to_data_points_forever(vdb_sub):
while True:
try:
await SubscriptionManager._subscribe_to_data_points(vdb_sub)
except (grpc.aio.AioRpcError, Exception) as ex: # type: ignore
logger.debug(
"Error in subscription -> {Subscription: %s}",
SubscriptionManager._subscription_tasks[vdb_sub],
)
logger.exception(ex)
if isinstance(ex, (grpc.aio.AioRpcError)): # type: ignore
if ex.code() is grpc.StatusCode.INVALID_ARGUMENT:
raise
logger.debug("Retrying after 2.5 seconds")
await asyncio.sleep(2.5)
else:
raise
class VdbSubscription:
"""Expose subscription handling to client."""
def __init__(self, vdb_client=None, query=None, call_back=None):
self.query = query
self.vdb_client = vdb_client
self.call_back = call_back
async def unsubscribe(self):
try:
task = await SubscriptionManager._remove_subscription(self)
return task
except Exception as ex:
logger.exception(ex)
async def subscribe(self):
try:
task = SubscriptionManager._subscription_tasks[self]
if task.cancelled():
SubscriptionManager._add_subscription(self)
return task
except Exception as ex:
logger.exception(ex)