Skip to content

Commit

Permalink
MNT: Minimize the datatypes created from space_packet_parser
Browse files Browse the repository at this point in the history
Previously all datatypes from numpy were created with int64/float64
encodings. np.array([1, 2, 3]) isn't minimized. We know the expected
datatype from space packet parser's xtce definition, so we can infer
a good numpy datatype to minimize the size.
  • Loading branch information
greglucas committed Jul 30, 2024
1 parent ba50882 commit d7e7bef
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 4 deletions.
2 changes: 1 addition & 1 deletion imap_processing/swapi/l1/swapi_l1.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def decompress_count(
# Decompress counts based on compression indicators
# If 0, value is already decompressed. If 1, value is compressed.
# If 1 and count is 0xFFFF, value is overflow.
new_count = copy.deepcopy(count_data)
new_count = copy.deepcopy(count_data).astype(np.int32)

# If data is compressed, decompress it
compressed_indices = compression_flag == 1
Expand Down
37 changes: 36 additions & 1 deletion imap_processing/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytest
import xarray as xr

from imap_processing import utils
from imap_processing import imap_module_directory, utils


def test_convert_raw_to_eu(tmp_path):
Expand Down Expand Up @@ -69,3 +69,38 @@ def test_convert_raw_to_eu(tmp_path):
eu_dataset = utils.convert_raw_to_eu(
dn_dataset.copy(), test_csv.absolute(), "PACKET_1", comment="#"
)


@pytest.mark.parametrize(
"use_derived_value, expected_mode", [(True, "HVENG"), (False, 2)]
)
def test_packet_file_to_datasets(use_derived_value, expected_mode):
"""
Test that all datatypes aren't all int64 and that we get
uint8/uint16 from header items as expected.
Test that we get multiple apids in the output.
"""
test_file = "tests/swapi/l0_data/imap_swapi_l0_raw_20231012_v001.pkts"
packet_files = imap_module_directory / test_file
packet_definition = (
imap_module_directory / "swapi/packet_definitions/swapi_packet_definition.xml"
)
datasets_by_apid = utils.packet_file_to_datasets(
packet_files, packet_definition, use_derived_value=use_derived_value
)
# 3 apids in the test data
assert len(datasets_by_apid) == 3
data = datasets_by_apid[1188]
assert data["sec_hdr_flg"].dtype == np.uint8
assert data["pkt_apid"].dtype == np.uint16
np.testing.assert_array_equal(data["mode"], [expected_mode] * len(data["mode"]))


def test__create_minimum_dtype_array():
"""Test expected return types for minimum data types."""
result = utils._create_minimum_dtype_array([1, 2, 3], "uint8")
assert result.dtype == np.dtype("uint8")
# fallback to a generic array if the requested dtype can't be satisfied
result = utils._create_minimum_dtype_array(["a", "b", "c"], "uint8")
assert result.dtype == np.dtype("<U1")
98 changes: 96 additions & 2 deletions imap_processing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,84 @@ def create_dataset(
return dataset


def _get_minimum_numpy_datatype( # noqa: PLR0912 - Too many branches
name: str, definition: xtcedef.XtcePacketDefinition
) -> str:
"""
Get the minimum datatype for a given variable.
Parameters
----------
name : str
The variable name.
definition : xtcedef.XtcePacketDefinition
The XTCE packet definition.
Returns
-------
datatype : str
The minimum datatype.
"""
data_encoding = definition.named_parameters[name].parameter_type.encoding

if isinstance(data_encoding, xtcedef.NumericDataEncoding):
nbits = data_encoding.size_in_bits
if isinstance(data_encoding, xtcedef.IntegerDataEncoding):
datatype = "int"
if data_encoding.encoding == "unsigned":
datatype = "uint"
if nbits <= 8:
datatype += "8"
elif nbits <= 16:
datatype += "16"
elif nbits <= 32:
datatype += "32"
else:
datatype += "64"
elif isinstance(data_encoding, xtcedef.FloatDataEncoding):
datatype = "float"
if nbits == 32:
datatype += "32"
else:
datatype += "64"
elif isinstance(data_encoding, xtcedef.BinaryDataEncoding):
# TODO: Binary string representation right now, do we want bytes or
# something else like the new StringDType instead?
datatype = "str"
elif isinstance(data_encoding, xtcedef.StringDataEncoding):
# TODO: Use the new StringDType instead?
datatype = "str"
else:
raise ValueError(f"Unsupported data encoding: {data_encoding}")

return datatype


def _create_minimum_dtype_array(values: list, dtype: str) -> np.ndarray:
"""
Create an array with the minimum datatype.
If it can't be coerced to that datatype, fallback to general array creation
without a specific datatype. This can happen with derived values.
Parameters
----------
values : list
List of values.
dtype : str
The datatype.
Returns
-------
array : np.array
The array of values.
"""
try:
return np.array(values, dtype=dtype)
except ValueError:
return np.array(values)


def packet_file_to_datasets(
packet_file: Union[str, Path],
xtce_packet_definition: Union[str, Path],
Expand Down Expand Up @@ -261,6 +339,8 @@ def packet_file_to_datasets(
# dataset per apid.
# {apid1: dataset1, apid2: dataset2, ...}
data_dict: dict[int, dict] = dict()
# Also keep track of the datatype mapping for each field
datatype_mapping: dict[int, dict] = dict()

# Set up the parser from the input packet definition
packet_definition = xtcedef.XtcePacketDefinition(xtce_packet_definition)
Expand All @@ -273,6 +353,7 @@ def packet_file_to_datasets(
if apid not in data_dict:
# This is the first packet for this APID
data_dict[apid] = collections.defaultdict(list)
datatype_mapping[apid] = dict()

# TODO: Do we want to give an option to remove the header content?
packet_content = packet.data | packet.header
Expand All @@ -283,16 +364,29 @@ def packet_file_to_datasets(
# Use the derived value if it exists, otherwise use the raw value
val = value.derived_value or val
data_dict[apid][key].append(val)
if key not in datatype_mapping[apid]:
# Add this datatype to the mapping
datatype_mapping[apid][key] = _get_minimum_numpy_datatype(
key, packet_definition
)

dataset_by_apid = {}
# Convert each apid's data to an xarray dataset

for apid, data in data_dict.items():
# The time key is always the first key in the data dictionary on IMAP
time_key = next(iter(data.keys()))
# Convert to J2000 time and use that as our primary dimension
time_data = met_to_j2000ns(data[time_key])
ds = xr.Dataset(
{key.lower(): ("epoch", val) for key, val in data.items()},
{
key.lower(): (
"epoch",
_create_minimum_dtype_array(
list_of_values, dtype=datatype_mapping[apid][key]
),
)
for key, list_of_values in data.items()
},
coords={"epoch": time_data},
)
ds = ds.sortby("epoch")
Expand Down

0 comments on commit d7e7bef

Please sign in to comment.