From 6a18061b767ac807fd12953b8aedebad259410ea Mon Sep 17 00:00:00 2001 From: bhannawa <78210177+bhannawa@users.noreply.github.com> Date: Tue, 19 Nov 2024 04:05:48 -0500 Subject: [PATCH] add module to extract message values for use in scripting (#106) --- pyulog/extract_message.py | 53 ++++++++++++++++++++++++++++++++++++ test/test_extract_message.py | 37 +++++++++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 pyulog/extract_message.py create mode 100644 test/test_extract_message.py diff --git a/pyulog/extract_message.py b/pyulog/extract_message.py new file mode 100644 index 0000000..2367312 --- /dev/null +++ b/pyulog/extract_message.py @@ -0,0 +1,53 @@ +""" +Extract values from a ULog file message to use in scripting +""" + +import numpy as np +from .core import ULog + +def extract_message(ulog_file_name: str, message: str, + time_s: "int | None" = None, time_e: "int | None" = None, + disable_str_exceptions: bool = False) -> list[dict]: + """ + Extract values from a ULog file + + :param ulog_file_name: (str) The ULog filename to open and read + :param message: (str) A ULog message to return values from + :param time_s: (int) Offset time for conversion in seconds + :param time_e: (int) Limit until time for conversion in seconds + + :return: (list[dict]) A list of each record from the ULog as key-value pairs + """ + + if not isinstance(message, str): + raise AttributeError("Must provide a message to pull from ULog file") + + ulog = ULog(ulog_file_name, message, disable_str_exceptions) + + try: + data = ulog.get_dataset(message) + except Exception as exc: + raise AttributeError("Provided message is not in the ULog file") from exc + + values = [] + + # use same field order as in the log, except for the timestamp + data_keys = [f.field_name for f in data.field_data] + data_keys.remove('timestamp') + data_keys.insert(0, 'timestamp') # we want timestamp at first position + + #get the index for row where timestamp exceeds or equals the required value + time_s_i = np.where(data.data['timestamp'] >= time_s * 1e6)[0][0] \ + if time_s else 0 + #get the index for row upto the timestamp of the required value + time_e_i = np.where(data.data['timestamp'] >= time_e * 1e6)[0][0] \ + if time_e else len(data.data['timestamp']) + + # write the data + for i in range(time_s_i, time_e_i): + row = {} + for key in data_keys: + row[key] = data.data[key][i] + values.append(row) + + return values diff --git a/test/test_extract_message.py b/test/test_extract_message.py new file mode 100644 index 0000000..a365b77 --- /dev/null +++ b/test/test_extract_message.py @@ -0,0 +1,37 @@ +''' +Test extract_message module +''' + +import os +import inspect +import unittest + +from ddt import ddt, data + +from pyulog.extract_message import extract_message + +TEST_PATH = os.path.dirname(os.path.abspath( + inspect.getfile(inspect.currentframe()))) + +@ddt +class TestExtractMessage(unittest.TestCase): + """ + Test extract_message module. + """ + + @data('sample') + def test_extract_message(self, test_case): + """ + Test that extract_message module runs without error. + """ + + ulog_file_name = os.path.join(TEST_PATH, test_case+'.ulg') + message = "actuator_controls_0" + time_s = None + time_e = None + extract_message(ulog_file_name, + message, + time_s, + time_e) + +# vim: set et fenc=utf-8 ft=python ff=unix sts=4 sw=4 ts=4