Skip to content

Commit

Permalink
Updated SQLite storage to use max_read_records_count as a limit for d…
Browse files Browse the repository at this point in the history
…ata fetching and minor optimizations
  • Loading branch information
imbeacon committed Jan 15, 2025
1 parent 8bff6c0 commit d2e0133
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 17 deletions.
41 changes: 40 additions & 1 deletion tests/unit/service/test_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@

from thingsboard_gateway.storage.file.file_event_storage import FileEventStorage
from thingsboard_gateway.storage.memory.memory_event_storage import MemoryEventStorage
from thingsboard_gateway.storage.sqlite.sqlite_event_storage import SQLiteEventStorage

LOG = getLogger("TEST")


class TestStorage(TestCase):
def test_memory_storage(self):

Expand Down Expand Up @@ -47,7 +49,7 @@ def test_file_storage(self):

for test_value in range(test_size * 10):
storage.put(str(test_value))
sleep(.01)
sleep(.001)

result = []
for _ in range(test_size):
Expand All @@ -63,3 +65,40 @@ def test_file_storage(self):
remove(storage_test_config["data_folder_path"]+"/"+file)
removedirs(storage_test_config["data_folder_path"])
self.assertListEqual(result, correct_result)

def test_sqlite_storage(self):
storage_test_config = {
"data_file_path": "storage/data/data.db",
"messages_ttl_check_in_hours": 1,
"messages_ttl_in_days": 7,
"max_read_records_count": 70
}

storage = SQLiteEventStorage(storage_test_config, LOG)
test_size = 20
expected_result = []
save_results = []

for test_value_int in range(test_size * 10):
test_value = str(test_value_int)
expected_result.append(test_value)
save_result = storage.put(test_value)
save_results.append(save_result)
sleep(.01)
sleep(1)

self.assertTrue(all(save_results))

result = []
for _ in range(test_size):
batch = storage.get_event_pack()
result.append(batch)
storage.event_pack_processing_done()

unpacked_result = []
for batch in result:
for item in batch:
unpacked_result.append(item)

remove(storage_test_config["data_file_path"])
self.assertListEqual(unpacked_result, expected_result)
29 changes: 16 additions & 13 deletions thingsboard_gateway/storage/sqlite/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,13 @@ def init_table(self):
self.db.commit()
except Exception as e:
self.db.rollback()
self.__log.exception(e)
self.__log.exception("Failed to create table! Error: %s", e)

def run(self):
while not self.stopped.is_set():
self.process()

sleep(.2)
if self.processQueue.empty():
sleep(.1)

def process(self):
try:
Expand All @@ -89,20 +89,22 @@ def process(self):
# Signalization so that we can spam call process()
if not self.stopped.is_set() and self.processQueue:
while self.processQueue.qsize() > 0:
try:

req = self.processQueue.get()

self.__log.debug("Processing %s" % req.type)
if req.type is DatabaseActionType.WRITE_DATA_STORAGE:
req = self.processQueue.get()

message = req.data
self.__log.debug("Processing %s" % req.type)
if req.type is DatabaseActionType.WRITE_DATA_STORAGE:

timestamp = time()
message = req.data

self.db.execute('''INSERT INTO messages (timestamp, message) VALUES (?, ?);''',
[timestamp, message])
timestamp = time()

self.db.commit()
self.db.execute('''INSERT INTO messages (timestamp, message) VALUES (?, ?);''',
[timestamp, message])
self.db.commit()
except Exception as e:
self.__log.error("Failed to process request! Error: %s", e)
else:
self.__log.info("Storage is closed!")

Expand All @@ -112,7 +114,8 @@ def process(self):

def read_data(self):
try:
data = self.db.execute('''SELECT timestamp, message FROM messages ORDER BY timestamp ASC LIMIT 0, 50;''')
data = self.db.execute('''SELECT timestamp, message FROM messages ORDER BY timestamp ASC LIMIT 0, %i;''' %
self.settings.max_read_records_count)
return data
except Exception as e:
self.db.rollback()
Expand Down
7 changes: 4 additions & 3 deletions thingsboard_gateway/storage/sqlite/database_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,18 @@ def commit(self):
"""
self.__log.debug("Committing changes to DB")

commited = False
while not commited and not self.stopped.is_set():
committed = False
while not committed and not self.stopped.is_set():
try:
self.__commit()
commited = True
committed = True
except sqlite3.ProgrammingError as e:
self.__log.exception("Failed to commit changes to database", exc_info=e)
self.__log.info('Trying to reconnect to database')
self.connect()
except Exception as e:
self.__log.exception("Failed to commit changes to database", exc_info=e)
return committed

def __commit(self):
with self.lock:
Expand Down
1 change: 1 addition & 0 deletions thingsboard_gateway/storage/sqlite/storage_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ def __init__(self, config):
self.data_folder_path = config.get("data_file_path", "./")
self.messages_ttl_check_in_hours = config.get('messages_ttl_check_in_hours', 1) * 3600
self.messages_ttl_in_days = config.get('messages_ttl_in_days', 7)
self.max_read_records_count = config.get('max_read_records_count', 1000)

0 comments on commit d2e0133

Please sign in to comment.