Skip to content

Commit d7b45f1

Browse files
authored
Merge branch 'master' into tests/write-unit-tests-rpc-attribute_updates-bacnet-connector
2 parents 78947dd + ec232eb commit d7b45f1

File tree

10 files changed

+203
-55
lines changed

10 files changed

+203
-55
lines changed

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,4 @@ tb-mqtt-client==1.13.9
2222
service-identity
2323
psutil
2424
cryptography
25+
PySocks

tests/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pyjwt==2.6.0
2222
tb-rest-client
2323
pyopenssl
2424
opcua
25-
asyncua
25+
asyncua==1.1.5
2626
twisted
2727
pymodbus==3.9.2
2828
pyserial

tests/unit/service/test_storage.py

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -146,14 +146,20 @@ class TestSQLiteEventStorageRotation(TestCase):
146146
def setUp(self):
147147
self.directory = path.join("storage", "data")
148148
self.db_path = path.join(self.directory, "data.db")
149+
if not path.exists(self.directory):
150+
try:
151+
from os import makedirs
152+
makedirs(self.directory, exist_ok=True)
153+
except Exception as e:
154+
LOG.warning(f"Failed to create directory {self.directory}: {e}")
149155
self.config = {
150156
"data_file_path": self.db_path,
151157
"messages_ttl_check_in_hours": 1,
152158
"messages_ttl_in_days": 7,
153159
"max_read_records_count": 1000,
154-
"size_limit": 0.025,
160+
"size_limit": 0.05,
155161
"max_db_amount": 3,
156-
"oversize_check_period": 1 / 60,
162+
"oversize_check_period": 1 / 20,
157163
"writing_batch_size": 1000,
158164
}
159165
self.settings = StorageSettings(self.config, enable_validation=False)
@@ -163,8 +169,9 @@ def setUp(self):
163169
def tearDown(self):
164170
self.stop_event.set()
165171
self.sqlite_storage.stop()
166-
rmtree(self.directory)
167-
sleep(2)
172+
sleep(1)
173+
rmtree(self.directory, ignore_errors=True)
174+
sleep(1)
168175

169176
def _drain_storage(self, storage: SQLiteEventStorage):
170177
out = []
@@ -187,16 +194,16 @@ def _fill_storage(self, storage, count, delay=0.0):
187194
sleep(delay)
188195

189196
def _db_files(self):
190-
return sorted(f for f in listdir(self.directory) if f.endswith(".db"))
197+
try:
198+
return sorted(f for f in listdir(self.directory)
199+
if f.endswith(".db") and not (f.endswith(".db-shm") or f.endswith(".db-wal")))
200+
except FileNotFoundError:
201+
return []
191202

192203
def test_write_read_without_rotation(self):
193204
self._fill_storage(self.sqlite_storage, 20)
194-
fat_msg = "X" * 32768
195205
sleep(1)
196206
self.assertListEqual(self._db_files(), ["data.db"])
197-
all_messages = self._drain_storage(self.sqlite_storage)
198-
self.assertEqual(len(all_messages), 20)
199-
self.assertListEqual(all_messages, [f"{i}:{fat_msg}" for i in range(20)])
200207

201208
self.sqlite_storage.stop()
202209

@@ -205,21 +212,28 @@ def test_rotation_creates_new_db(self):
205212
self._fill_storage(self.sqlite_storage, DATA_RANGE, delay=0.1)
206213
sleep(2.0)
207214
dbs = self._db_files()
208-
self.assertEqual(len(dbs), 2)
215+
self.assertEqual(len(dbs), 2, f"Expected 2 database files after rotation, got {len(dbs)}: {dbs}")
209216
self.assertLessEqual(len(dbs), self.config["max_db_amount"])
210-
211217
self.sqlite_storage.stop()
218+
sleep(0.5)
212219

213220
def test_rotation_persists_across_restart(self):
214221
DATA_RANGE = 150
215-
fat_msg = "X" * 32768
216222
self._fill_storage(self.sqlite_storage, DATA_RANGE, delay=0.1)
217223
sleep(2.0)
224+
dbs_before = self._db_files()
225+
self.assertGreater(len(dbs_before), 1, "Expected rotation to create at least one new database file")
218226
self.sqlite_storage.stop()
219227
storage2 = SQLiteEventStorage(self.settings, LOG, self.stop_event)
220-
all_messages = self._drain_storage(storage2)
221-
self.assertEqual(len(all_messages), DATA_RANGE)
222-
self.assertListEqual(all_messages, [f"{i}:{fat_msg}" for i in range(DATA_RANGE)])
228+
dbs_after = self._db_files()
229+
self.assertEqual(len(dbs_before), len(dbs_after),
230+
f"Database file count changed after restart: {len(dbs_before)} -> {len(dbs_after)}")
231+
self.assertListEqual(dbs_before, dbs_after,
232+
"Database files changed after restart")
233+
messages = self._drain_storage(storage2)
234+
self.assertEqual(len(messages), DATA_RANGE,
235+
f"Expected to read {DATA_RANGE} messages, got {len(messages)}")
236+
223237
storage2.stop()
224238

225239
def test_no_new_database_appear_after_max_db_amount_reached(self):
@@ -246,11 +260,6 @@ def test_no_new_database_appear_after_max_db_amount_reached(self):
246260
put_results,
247261
"Expected self.sqlite_storage.put(...) to eventually return False once max_db_amount was reached",
248262
)
249-
all_messages = self._drain_storage(self.sqlite_storage)
250-
self.assertEqual(len(all_messages), len(messages_before_db_amount_reached))
251-
self.assertListEqual(
252-
all_messages, messages_before_db_amount_reached
253-
)
254263
self.sqlite_storage.stop()
255264

256265
def test_sqlite_storage_is_operational_after_max_db_amount_reached_and_storage_restart(
@@ -281,9 +290,4 @@ def test_sqlite_storage_is_operational_after_max_db_amount_reached_and_storage_r
281290
put_results,
282291
"Expected storage.put(...) eventually to return False once max_db_amount was reached",
283292
)
284-
all_messages = list(self._drain_storage(storage2))
285-
self.assertEqual(len(all_messages), len(messages_before_db_amount_reached))
286-
self.assertListEqual(
287-
all_messages, messages_before_db_amount_reached
288-
)
289-
storage2.stop()
293+
storage2.stop()

thingsboard_gateway/connectors/bacnet/bacnet_connector.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import asyncio
1717
from asyncio import Queue, CancelledError, QueueEmpty
1818
from copy import deepcopy
19+
from datetime import time
1920
from threading import Thread
2021
from string import ascii_lowercase
2122
from random import choice
@@ -24,6 +25,8 @@
2425
from concurrent.futures import TimeoutError
2526

2627
from thingsboard_gateway.connectors.bacnet.constants import SUPPORTED_OBJECTS_TYPES
28+
from typing import TYPE_CHECKING
29+
from ast import literal_eval
2730

2831
from thingsboard_gateway.connectors.bacnet.ede_parser import EDEParser
2932
from thingsboard_gateway.connectors.bacnet.entities.routers import Routers
@@ -42,11 +45,12 @@
4245
from bacpypes3.apdu import ErrorRejectAbortNack
4346

4447
from bacpypes3.pdu import Address, IPv4Address
48+
from bacpypes3.primitivedata import Null, Real
49+
from bacpypes3.basetypes import DailySchedule, TimeValue, DeviceObjectPropertyReference
4550
from thingsboard_gateway.connectors.bacnet.device import Device, Devices
4651
from thingsboard_gateway.connectors.bacnet.entities.device_object_config import DeviceObjectConfig
4752
from thingsboard_gateway.connectors.bacnet.application import Application
4853
from thingsboard_gateway.connectors.bacnet.backward_compatibility_adapter import BackwardCompatibilityAdapter
49-
from bacpypes3.primitivedata import Null
5054

5155
if TYPE_CHECKING:
5256
from thingsboard_gateway.gateway.tb_gateway_service import TBGatewayService
@@ -548,6 +552,12 @@ async def __write_property(self, address, object_id, property_id, value, priorit
548552
if value is None:
549553
value = Null(())
550554

555+
if property_id == "weeklySchedule":
556+
value = self.__prepare_weekly_schedule_value(value)
557+
558+
if property_id == "listOfObjectPropertyReferences":
559+
value = self.__prepare_list_of_object_property_references_value(value, property_id)
560+
551561
await self.__application.write_property(address, object_id, property_id, value, priority=priority)
552562
result['value'] = value
553563
return result
@@ -561,6 +571,32 @@ async def __write_property(self, address, object_id, property_id, value, priorit
561571
self.__log.error('Error writing property %s:%s to device %s: %s', object_id, property_id, address, err)
562572
return {'error': str(err)}
563573

574+
async def __prepare_weekly_schedule_value(self, value):
575+
schedule = []
576+
raw_schedule = literal_eval(value)
577+
for idx, day in enumerate(raw_schedule):
578+
schedule.append(DailySchedule(daySchedule=[]))
579+
for sched in day:
580+
schedule[idx].daySchedule.append(
581+
TimeValue(
582+
time=time(int(sched[0].split(":")[0]), int(sched[0].split(":")[1])),
583+
value=Real(sched[1])
584+
)
585+
)
586+
return schedule
587+
588+
async def __prepare_list_of_object_property_references_value(self, value, property_id):
589+
props = []
590+
raw_props = literal_eval(value)
591+
for prop in raw_props:
592+
props.append(
593+
DeviceObjectPropertyReference(
594+
objectIdentifier = prop,
595+
propertyIdentifier = property_id
596+
)
597+
)
598+
return props
599+
564600
async def __convert_data(self):
565601
while not self.__stopped:
566602
try:

thingsboard_gateway/connectors/bacnet/bacnet_uplink_converter.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
from bacpypes3.basetypes import DateTime
1717
from bacpypes3.constructeddata import AnyAtomic, Array
18-
from bacpypes3.basetypes import ErrorType, PriorityValue, ObjectPropertyReference
18+
from bacpypes3.basetypes import ErrorType, PriorityValue, ObjectPropertyReference, DailySchedule, DeviceObjectPropertyReference
1919

2020
from thingsboard_gateway.connectors.bacnet.bacnet_converter import AsyncBACnetConverter
2121
from thingsboard_gateway.connectors.bacnet.entities.uplink_converter_config import UplinkConverterConfig
@@ -109,9 +109,19 @@ def __convert_data(self, data):
109109
for item in value:
110110
if isinstance(item, PriorityValue):
111111
result.append(str(getattr(item, item._choice)))
112+
if isinstance(item, DailySchedule):
113+
schedular_array = []
114+
for sched in item.daySchedule:
115+
schedular_array.append((str(sched.time), str(sched.value.get_value())))
116+
result.append(schedular_array)
112117
else:
113118
result.append(item)
114-
119+
value = result
120+
elif isinstance(value, list):
121+
result = []
122+
for item in value:
123+
if isinstance(item, DeviceObjectPropertyReference):
124+
result.append(str(item.objectIdentifier))
115125
value = result
116126

117127
converted_data.append({'propName': str(value_prop_id), 'value': value})

thingsboard_gateway/connectors/opcua/opcua_connector.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@
2727
from cachetools import TTLCache
2828

2929
from thingsboard_gateway.connectors.connector import Connector
30-
from thingsboard_gateway.connectors.opcua.entities.rpc_request import OpcUaRpcRequest, OpcUaRpcType
3130
from thingsboard_gateway.gateway.constants import CONNECTOR_PARAMETER, RECEIVED_TS_PARAMETER, CONVERTED_TS_PARAMETER, \
32-
DATA_RETRIEVING_STARTED, REPORT_STRATEGY_PARAMETER, RPC_DEFAULT_TIMEOUT, ON_ATTRIBUTE_UPDATE_DEFAULT_TIMEOUT
31+
DATA_RETRIEVING_STARTED, REPORT_STRATEGY_PARAMETER, ON_ATTRIBUTE_UPDATE_DEFAULT_TIMEOUT
3332
from thingsboard_gateway.gateway.entities.converted_data import ConvertedData
3433
from thingsboard_gateway.gateway.entities.report_strategy_config import ReportStrategyConfig
3534
from thingsboard_gateway.gateway.statistics.statistics_service import StatisticsService
@@ -38,13 +37,30 @@
3837
from thingsboard_gateway.tb_utility.tb_logger import init_logger
3938
from thingsboard_gateway.tb_utility.tb_utility import TBUtility
4039

40+
# Try import asyncua library or install it and import
41+
installation_required = False
42+
required_version = '1.1.5'
43+
force_install = False
44+
45+
from packaging import version
4146
try:
42-
import asyncua
43-
except (ImportError, ModuleNotFoundError):
47+
from asyncua import __version__ as asyncua_version
48+
49+
if version.parse(asyncua_version) < version.parse(required_version):
50+
installation_required = True
51+
52+
if version.parse(asyncua_version) > version.parse(required_version):
53+
installation_required = True
54+
force_install = True
55+
56+
except ImportError:
57+
installation_required = True
58+
59+
if installation_required:
4460
print("OPC-UA library not found")
45-
TBUtility.install_package("asyncua")
46-
import asyncua
61+
TBUtility.install_package("asyncua", required_version, force_install=force_install)
4762

63+
import asyncua
4864
from asyncua import ua, Node
4965
from asyncua.ua import NodeId, UaStringParsingError
5066
from asyncua.common.ua_utils import value_to_datavalue
@@ -54,6 +70,7 @@
5470
from asyncua.ua.uaerrors import UaStatusCodeError, BadNodeIdUnknown, BadConnectionClosed, \
5571
BadInvalidState, BadSessionClosed, BadAttributeIdInvalid, BadCommunicationError, BadOutOfService, BadNoMatch, \
5672
BadUnexpectedError, UaStatusCodeErrors, BadWaitingForInitialData, BadSessionIdInvalid, BadSubscriptionIdInvalid
73+
from thingsboard_gateway.connectors.opcua.entities.rpc_request import OpcUaRpcRequest, OpcUaRpcType
5774
from thingsboard_gateway.connectors.opcua.device import Device
5875
from thingsboard_gateway.connectors.opcua.backward_compatibility_adapter import BackwardCompatibilityAdapter
5976

@@ -448,7 +465,7 @@ async def retry_connect_with_backoff(self, max_retries=8, initial_delay=1, backo
448465
except Exception as e:
449466
base_time = self.__client.session_timeout / 1000 if (
450467
last_contact_delta > 0 and last_contact_delta < self.__client.session_timeout / 1000) else 0
451-
time_to_wait = base_time / 1000 + delay
468+
time_to_wait = base_time + delay
452469
time_to_wait = min(self.__server_conf.get('sessionTimeoutInMillis', 120000) / 1000 * 0.8, time_to_wait)
453470
self.__log.error('Encountered error: %r. Next connection try in %i second(s)...', e, time_to_wait)
454471
await asyncio.sleep(time_to_wait)

thingsboard_gateway/gateway/constants.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,4 +149,8 @@ def from_string(cls, value: str):
149149
'enable': False
150150
}
151151

152-
CUSTOM_RPC_DIR = "/etc/thingsboard-gateway/rpc"
152+
CUSTOM_RPC_DIR = "/etc/thingsboard-gateway/rpc"
153+
154+
# Provisioning constants
155+
156+
PROVISIONED_CREDENTIALS_FILENAME = ".credentials"

0 commit comments

Comments
 (0)