diff --git a/imap_processing/cdf/utils.py b/imap_processing/cdf/utils.py index 8b9d2b06a..b4813dc71 100644 --- a/imap_processing/cdf/utils.py +++ b/imap_processing/cdf/utils.py @@ -103,7 +103,7 @@ def load_cdf( return dataset -def write_cdf(dataset: xr.Dataset) -> Path: +def write_cdf(dataset: xr.Dataset, **extra_cdf_kwargs: dict) -> Path: """ Write the contents of "data" to a CDF file using cdflib.xarray_to_cdf. @@ -118,6 +118,8 @@ def write_cdf(dataset: xr.Dataset) -> Path: ---------- dataset : xarray.Dataset The dataset object to convert to a CDF. + **extra_cdf_kwargs : dict + Additional keyword arguments to pass to the ``xarray_to_cdf`` function. Returns ------- @@ -166,6 +168,7 @@ def write_cdf(dataset: xr.Dataset) -> Path: dataset, str(file_path), terminate_on_warning=True, + **extra_cdf_kwargs, ) # Terminate if not ISTP compliant return file_path diff --git a/imap_processing/hi/l1a/hi_l1a.py b/imap_processing/hi/l1a/hi_l1a.py index 3388f5796..132a5b0e2 100644 --- a/imap_processing/hi/l1a/hi_l1a.py +++ b/imap_processing/hi/l1a/hi_l1a.py @@ -1,20 +1,22 @@ """IMAP-HI L1A processing module.""" import logging +from pathlib import Path +from typing import Union import xarray as xr -from imap_processing.hi.l0 import decom_hi +from imap_processing import imap_module_directory from imap_processing.hi.l1a.histogram import create_dataset as hist_create_dataset from imap_processing.hi.l1a.housekeeping import process_housekeeping from imap_processing.hi.l1a.science_direct_event import science_direct_event from imap_processing.hi.utils import HIAPID -from imap_processing.utils import group_by_apid +from imap_processing.utils import packet_file_to_datasets logger = logging.getLogger(__name__) -def hi_l1a(packet_file_path: str, data_version: str) -> list[xr.Dataset]: +def hi_l1a(packet_file_path: Union[str, Path], data_version: str) -> list[xr.Dataset]: """ Will process IMAP raw data to l1a. @@ -30,30 +32,32 @@ def hi_l1a(packet_file_path: str, data_version: str) -> list[xr.Dataset]: processed_data : list[xarray.Dataset] List of processed xarray dataset. """ - unpacked_data = decom_hi.decom_packets(packet_file_path) - - # group data by apid - grouped_data = group_by_apid(unpacked_data) + packet_def_file = ( + imap_module_directory / "hi/packet_definitions/hi_packet_definition.xml" + ) + datasets_by_apid = packet_file_to_datasets( + packet_file=packet_file_path, xtce_packet_definition=packet_def_file + ) # Process science to l1a. processed_data = [] - for apid in grouped_data.keys(): + for apid in datasets_by_apid: if apid == HIAPID.H45_SCI_CNT: logger.info( "Processing histogram data for [%s] packets", HIAPID.H45_SCI_CNT.name ) - data = hist_create_dataset(grouped_data[apid]) + data = hist_create_dataset(datasets_by_apid[apid]) elif apid == HIAPID.H45_SCI_DE: logger.info( "Processing direct event data for [%s] packets", HIAPID.H45_SCI_DE.name ) - data = science_direct_event(grouped_data[apid]) + data = science_direct_event(datasets_by_apid[apid]) elif apid == HIAPID.H45_APP_NHK: logger.info( "Processing housekeeping data for [%s] packets", HIAPID.H45_APP_NHK.name ) - data = process_housekeeping(grouped_data[apid]) + data = process_housekeeping(datasets_by_apid[apid]) else: raise RuntimeError(f"Encountered unexpected APID [{apid}]") diff --git a/imap_processing/hi/l1a/histogram.py b/imap_processing/hi/l1a/histogram.py index 583a96445..c1356faeb 100644 --- a/imap_processing/hi/l1a/histogram.py +++ b/imap_processing/hi/l1a/histogram.py @@ -2,10 +2,8 @@ import numpy as np import xarray as xr -from space_packet_parser.parser import Packet from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes -from imap_processing.cdf.utils import met_to_j2000ns # define the names of the 24 counter arrays # contained in the histogram packet @@ -35,30 +33,34 @@ TOTAL_COUNTERS = ("total_a", "total_b", "total_c", "fee_de_sent", "fee_de_recd") -def create_dataset(packets: list[Packet]) -> xr.Dataset: +def create_dataset(input_ds: xr.Dataset) -> xr.Dataset: """ Create dataset for a number of Hi Histogram packets. Parameters ---------- - packets : list[space_packet_parser.ParsedPacket] - Packet list. + input_ds : xarray.Dataset + Dataset of packets. Returns ------- dataset : xarray.Dataset Dataset with all metadata field data in xr.DataArray. """ - dataset = allocate_histogram_dataset(len(packets)) + dataset = allocate_histogram_dataset(len(input_ds.epoch)) - # unpack the packets data into the Dataset - for i_epoch, packet in enumerate(packets): - dataset.epoch.data[i_epoch] = met_to_j2000ns(packet.data["CCSDS_MET"].raw_value) - dataset.ccsds_met[i_epoch] = packet.data["CCSDS_MET"].raw_value - dataset.esa_stepping_num[i_epoch] = packet.data["ESA_STEP"].raw_value + # TODO: Move into the allocate dataset function? + dataset["epoch"].data[:] = input_ds["epoch"].data + dataset["ccsds_met"].data = input_ds["ccsds_met"].data + dataset["esa_stepping_num"].data = input_ds["esa_step"].data + # unpack the packets data into the Dataset + # (npackets, 24 * 90 * 12) + # TODO: Look into avoiding the for-loops below + # It seems like we could try to reshape the arrays and do some numpy + # broadcasting rather than for-loops directly here + for i_epoch, counters_binary_data in enumerate(input_ds["counters"].data): # unpack 24 arrays of 90 12-bit unsigned integers - counters_binary_data = packet.data["COUNTERS"].raw_value counter_ints = [ int(counters_binary_data[i * 12 : (i + 1) * 12], 2) for i in range(90 * 24) ] diff --git a/imap_processing/hi/l1a/housekeeping.py b/imap_processing/hi/l1a/housekeeping.py index d48f7ce5b..572d3229b 100644 --- a/imap_processing/hi/l1a/housekeeping.py +++ b/imap_processing/hi/l1a/housekeeping.py @@ -1,34 +1,26 @@ """Unpack IMAP-Hi housekeeping data.""" import xarray as xr -from space_packet_parser.parser import Packet from imap_processing.hi.hi_cdf_attrs import ( hi_hk_l1a_attrs, ) -from imap_processing.utils import create_dataset, update_epoch_to_datetime -def process_housekeeping(packets: list[Packet]) -> xr.Dataset: +def process_housekeeping(dataset: xr.Dataset) -> xr.Dataset: """ Create dataset for each metadata field. Parameters ---------- - packets : list[space_packet_parser.ParsedPacket] - Packet list. + dataset : xarray.Dataset + Packet input dataset. Returns ------- dataset : xarray.Dataset Dataset with all metadata field data in xr.DataArray. """ - dataset = create_dataset( - packets=packets, spacecraft_time_key="ccsds_met", skip_keys=["INSTR_SPECIFIC"] - ) - # Update epoch to datetime - dataset = update_epoch_to_datetime(dataset) - # Add datalevel attrs dataset.attrs.update(hi_hk_l1a_attrs.output()) return dataset diff --git a/imap_processing/hi/l1a/science_direct_event.py b/imap_processing/hi/l1a/science_direct_event.py index 7b20c0cc3..6711ef36f 100644 --- a/imap_processing/hi/l1a/science_direct_event.py +++ b/imap_processing/hi/l1a/science_direct_event.py @@ -2,7 +2,6 @@ import numpy as np import xarray as xr -from space_packet_parser.parser import Packet from imap_processing import imap_module_directory from imap_processing.cdf.cdf_attribute_manager import CdfAttributeManager @@ -297,7 +296,7 @@ def create_dataset(de_data_list: list, packet_met_time: list) -> xr.Dataset: return dataset -def science_direct_event(packets_data: list[Packet]) -> xr.Dataset: +def science_direct_event(packets_data: xr.Dataset) -> xr.Dataset: """ Unpack IMAP-Hi direct event data. @@ -309,8 +308,8 @@ def science_direct_event(packets_data: list[Packet]) -> xr.Dataset: Parameters ---------- - packets_data : list[space_packet_parser.ParsedPacket] - List of packets data. + packets_data : xarray.Dataset + Packets extracted into a dataset. Returns ------- @@ -324,14 +323,14 @@ def science_direct_event(packets_data: list[Packet]) -> xr.Dataset: # I am using extend to add another list to the # end of the list. This way, I don't need to flatten # the list later. - for data in packets_data: + for i, data in enumerate(packets_data["de_tof"].data): # break binary stream data into unit of 48-bits - event_48bits_list = break_into_bits_size(data.data["DE_TOF"].raw_value) + event_48bits_list = break_into_bits_size(data) # parse 48-bits into meaningful data such as metaevent or direct event de_data_list.extend([parse_direct_event(event) for event in event_48bits_list]) # add packet time to packet_met_time packet_met_time.extend( - [data.data["CCSDS_MET"].raw_value] * len(event_48bits_list) + [packets_data["ccsds_met"].data[i]] * len(event_48bits_list) ) # create dataset diff --git a/imap_processing/swapi/l1/swapi_l1.py b/imap_processing/swapi/l1/swapi_l1.py index 1becd7d87..c7569479e 100644 --- a/imap_processing/swapi/l1/swapi_l1.py +++ b/imap_processing/swapi/l1/swapi_l1.py @@ -8,8 +8,6 @@ from imap_processing import imap_module_directory from imap_processing.cdf.global_attrs import ConstantCoordinates -from imap_processing.cdf.utils import met_to_j2000ns -from imap_processing.decom import decom_packets from imap_processing.swapi.swapi_cdf_attrs import ( compression_attrs, counts_attrs, @@ -19,12 +17,7 @@ uncertainty_attrs, ) from imap_processing.swapi.swapi_utils import SWAPIAPID, SWAPIMODE -from imap_processing.utils import ( - create_dataset, - group_by_apid, - sort_by_time, - update_epoch_to_datetime, -) +from imap_processing.utils import packet_file_to_datasets def filter_good_data(full_sweep_sci: xr.Dataset) -> np.ndarray: @@ -172,7 +165,7 @@ def find_sweep_starts(packets: xr.Dataset) -> np.ndarray: # # [0 0 0 1 0 0 0 0 0 0 0 0 1 0 0 0 0] # And all? - ione = diff == 1 + ione = diff == 1e9 # 1 second valid = ( (packets["seq_number"] == 0)[:-11] @@ -474,10 +467,10 @@ def process_swapi_science(sci_dataset: xr.Dataset, data_version: str) -> xr.Data # =================================================================== # epoch time. Should be same dimension as number of good sweeps - epoch_time = good_sweep_sci["epoch"].data.reshape(total_full_sweeps, 12)[:, 0] - epoch_converted_time = met_to_j2000ns(epoch_time) + epoch_values = good_sweep_sci["epoch"].data.reshape(total_full_sweeps, 12)[:, 0] + epoch_time = xr.DataArray( - epoch_converted_time, + epoch_values, name="epoch", dims=["epoch"], attrs=ConstantCoordinates.EPOCH, @@ -532,6 +525,7 @@ def process_swapi_science(sci_dataset: xr.Dataset, data_version: str) -> xr.Data ) # L1 quality flags + # TODO: Should these be kept in raw format rather than derived into strings? dataset["swp_pcem_flags"] = xr.DataArray( np.array(pcem_compression_flags, dtype=np.uint8), dims=["epoch", "energy"], @@ -626,25 +620,19 @@ def swapi_l1(file_path: str, data_version: str) -> xr.Dataset: xtce_definition = ( f"{imap_module_directory}/swapi/packet_definitions/swapi_packet_definition.xml" ) - packets = decom_packets( - packet_file=file_path, xtce_packet_definition=xtce_definition + datasets = packet_file_to_datasets( + file_path, xtce_definition, use_derived_value=False ) - grouped_packets = group_by_apid(packets) processed_data = [] - for apid in grouped_packets.keys(): - sorted_packets = sort_by_time(grouped_packets[apid], "SHCOARSE") + for apid, ds_data in datasets.items(): # Right now, we only process SWP_HK and SWP_SCI # other packets are not process in this processing pipeline # If appId is science, then the file should contain all data of science appId - ds_data = create_dataset(sorted_packets, include_header=False) if apid == SWAPIAPID.SWP_SCI.value: data = process_swapi_science(ds_data, data_version) processed_data.append(data) if apid == SWAPIAPID.SWP_HK.value: - # convert epoch to datetime - ds_data = update_epoch_to_datetime(ds_data) - # Add datalevel attrs ds_data.attrs.update(swapi_l1_hk_attrs.output()) ds_data.attrs["Data_version"] = data_version diff --git a/imap_processing/tests/hi/test_l1a.py b/imap_processing/tests/hi/test_l1a.py index 5d1b608bc..bd6c1d280 100644 --- a/imap_processing/tests/hi/test_l1a.py +++ b/imap_processing/tests/hi/test_l1a.py @@ -57,7 +57,7 @@ def test_app_nhk_decom(): # TODO: compare with validation data once we have it # Write CDF - cem_raw_cdf_filepath = write_cdf(processed_data[0]) + cem_raw_cdf_filepath = write_cdf(processed_data[0], istp=False) # TODO: ask Vivek about this date mismatch between the file name # and the data. May get resolved when we have good sample data. diff --git a/imap_processing/tests/swapi/test_swapi_l1.py b/imap_processing/tests/swapi/test_swapi_l1.py index 12efc71c3..b3631f2c0 100644 --- a/imap_processing/tests/swapi/test_swapi_l1.py +++ b/imap_processing/tests/swapi/test_swapi_l1.py @@ -4,7 +4,6 @@ from imap_processing import imap_module_directory from imap_processing.cdf.utils import met_to_j2000ns, write_cdf -from imap_processing.decom import decom_packets from imap_processing.swapi.l1.swapi_l1 import ( SWAPIAPID, decompress_count, @@ -15,21 +14,21 @@ process_sweep_data, swapi_l1, ) -from imap_processing.utils import create_dataset, group_by_apid, sort_by_time +from imap_processing.swapi.swapi_utils import SWAPIMODE +from imap_processing.utils import packet_file_to_datasets @pytest.fixture(scope="session") def decom_test_data(): """Read test data from file""" - test_folder_path = "tests/swapi/l0_data" - packet_files = list(imap_module_directory.glob(f"{test_folder_path}/*.pkts")) + test_file = "tests/swapi/l0_data/imap_swapi_l0_raw_20231012_v001.pkts" + packet_files = imap_module_directory / test_file packet_definition = ( f"{imap_module_directory}/swapi/packet_definitions/swapi_packet_definition.xml" ) - data_list = [] - for packet_file in packet_files: - data_list.extend(decom_packets(packet_file, packet_definition)) - return data_list + return packet_file_to_datasets( + packet_files, packet_definition, use_derived_value=False + ) def test_filter_good_data(): @@ -40,7 +39,7 @@ def test_filter_good_data(): { "plan_id_science": xr.DataArray(np.full((total_sweeps * 12), 1)), "sweep_table": xr.DataArray(np.repeat(np.arange(total_sweeps), 12)), - "mode": xr.DataArray(np.full((total_sweeps * 12), 2)), + "mode": xr.DataArray(np.full((total_sweeps * 12), SWAPIMODE.HVENG.value)), }, coords={"epoch": np.arange(total_sweeps * 12)}, ) @@ -49,17 +48,19 @@ def test_filter_good_data(): bad_data_indices = filter_good_data(ds) assert len(bad_data_indices) == 36 - # Check for bad MODE data. - # This test returns this indices because MODE has 0, 1 values - # for the first two sweeps. + # Check for bad MODE data, only HVENG is "good" # TODO: update test when we update MODE from HVENG to HVSCI - ds["mode"] = xr.DataArray(np.repeat(np.arange(total_sweeps), 12)) + ds["mode"] = xr.DataArray( + np.repeat( + [SWAPIMODE.LVENG.value, SWAPIMODE.LVSCI.value, SWAPIMODE.HVENG.value], 12 + ) + ) bad_data_indices = filter_good_data(ds) np.testing.assert_array_equal(bad_data_indices, np.arange(24, 36)) # Check for bad sweep_table data. # Reset MODE data and create first sweep to be mixed value - ds["mode"] = xr.DataArray(np.full((total_sweeps * 12), 2)) + ds["mode"] = xr.DataArray(np.full((total_sweeps * 12), SWAPIMODE.HVENG.value)) ds["sweep_table"][:12] = np.arange(0, 12) np.testing.assert_array_equal(filter_good_data(ds), np.arange(12, 36)) @@ -87,7 +88,9 @@ def test_find_sweep_starts(): """Test for find sweep starts""" time = np.arange(26) sequence_number = time % 12 - ds = xr.Dataset({"seq_number": sequence_number}, coords={"epoch": time}) + ds = xr.Dataset( + {"seq_number": sequence_number}, coords={"epoch": met_to_j2000ns(time)} + ) start_indices = find_sweep_starts(ds) np.testing.assert_array_equal(start_indices, [0, 12]) @@ -107,7 +110,9 @@ def test_get_full_indices(): """Test for correct full sweep indices""" time = np.arange(26) sequence_number = time % 12 - ds = xr.Dataset({"seq_number": sequence_number}, coords={"epoch": time}) + ds = xr.Dataset( + {"seq_number": sequence_number}, coords={"epoch": met_to_j2000ns(time)} + ) sweep_indices = get_indices_of_full_sweep(ds) np.testing.assert_array_equal(sweep_indices, np.arange(0, 24)) @@ -115,10 +120,7 @@ def test_get_full_indices(): def test_swapi_algorithm(decom_test_data): """Test SWAPI L1 algorithm""" - grouped_data = group_by_apid(decom_test_data) - science_data = grouped_data[SWAPIAPID.SWP_SCI] - sorted_packets = sort_by_time(science_data, "SHCOARSE") - ds_data = create_dataset(sorted_packets, include_header=False) + ds_data = decom_test_data[SWAPIAPID.SWP_SCI] full_sweep_indices = get_indices_of_full_sweep(ds_data) full_sweep_sci = ds_data.isel({"epoch": full_sweep_indices}) total_packets = len(full_sweep_sci["seq_number"].data) @@ -208,10 +210,7 @@ def test_swapi_algorithm(decom_test_data): def test_process_swapi_science(decom_test_data): """Test process swapi science""" - grouped_data = group_by_apid(decom_test_data) - science_data = grouped_data[SWAPIAPID.SWP_SCI] - sorted_packets = sort_by_time(science_data, "SHCOARSE") - ds_data = create_dataset(sorted_packets, include_header=False) + ds_data = decom_test_data[SWAPIAPID.SWP_SCI] processed_data = process_swapi_science(ds_data, data_version="001") # Test dataset dimensions @@ -333,5 +332,6 @@ def test_swapi_l1_cdf(): # hk cdf file cdf_filename = "imap_swapi_l1_hk_20100101_v001.cdf" - cdf_path = write_cdf(processed_data[1]) + # Ignore ISTP checks for HK data + cdf_path = write_cdf(processed_data[1], istp=False) assert cdf_path.name == cdf_filename diff --git a/imap_processing/utils.py b/imap_processing/utils.py index 0553df428..2a70c6df0 100644 --- a/imap_processing/utils.py +++ b/imap_processing/utils.py @@ -3,12 +3,13 @@ import collections import dataclasses import logging -from typing import Optional +from pathlib import Path +from typing import Optional, Union import numpy as np import pandas as pd import xarray as xr -from space_packet_parser.parser import Packet +from space_packet_parser import parser, xtcedef from imap_processing.cdf.global_attrs import ConstantCoordinates from imap_processing.cdf.utils import met_to_j2000ns @@ -142,7 +143,7 @@ def convert_raw_to_eu( def create_dataset( - packets: list[Packet], + packets: list[parser.Packet], spacecraft_time_key: str = "shcoarse", include_header: bool = True, skip_keys: Optional[list[str]] = None, @@ -196,8 +197,7 @@ def create_dataset( # NOTE: At this point, we keep epoch time as raw value from packet # which is in seconds and spacecraft time. Some instrument uses this - # raw value in processing. If you want to convert this to datetime - # object, you can use `update_epoch_to_datetime` function afterwards. + # raw value in processing. epoch_time = xr.DataArray( metadata_arrays[spacecraft_time_key], name="epoch", @@ -228,29 +228,77 @@ def create_dataset( return dataset -def update_epoch_to_datetime(dataset: xr.Dataset) -> xr.Dataset: +def packet_file_to_datasets( + packet_file: Union[str, Path], + xtce_packet_definition: Union[str, Path], + use_derived_value: bool = True, +) -> dict[int, xr.Dataset]: """ - Update epoch in dataset to datetime object. + Convert a packet file to xarray datasets. + + The packet file can contain multiple apids and these will be separated + into distinct datasets, one per apid. The datasets will contain the + ``derived_value``s of the data fields, and the ``raw_value``s if no + ``derived_value`` is available. If there are conversions in the XTCE + packet definition, the ``derived_value`` will be the converted value. + The dimension of the dataset will be the time field in J2000 nanoseconds. Parameters ---------- - dataset : xr.Dataset - Dataset to update. + packet_file : str + Path to data packet path with filename. + xtce_packet_definition : str + Path to XTCE file with filename. + use_derived_value : bool, default True + Whether or not to use the derived value from the XTCE definition. Returns ------- - dataset : xr.Dataset - Dataset with updated epoch dimension from int to datetime object. + datasets : dict + Mapping from apid to xarray dataset, one dataset per apid. """ - # convert epoch to datetime - epoch_converted_time = met_to_j2000ns(dataset["epoch"]) - # add attrs back to epoch - epoch = xr.DataArray( - epoch_converted_time, - name="epoch", - dims=["epoch"], - attrs=ConstantCoordinates.EPOCH, - ) - dataset = dataset.assign_coords(epoch=epoch) + # Set up containers to store our data + # We are getting a packet file that may contain multiple apids + # Each apid has consistent data fields, so we want to create a + # dataset per apid. + # {apid1: dataset1, apid2: dataset2, ...} + data_dict: dict[int, dict] = dict() + + # Set up the parser from the input packet definition + packet_definition = xtcedef.XtcePacketDefinition(xtce_packet_definition) + packet_parser = parser.PacketParser(packet_definition) + + with open(packet_file, "rb") as binary_data: + packet_generator = packet_parser.generator(binary_data) + for packet in packet_generator: + apid = packet.header["PKT_APID"].raw_value + if apid not in data_dict: + # This is the first packet for this APID + data_dict[apid] = collections.defaultdict(list) + + # TODO: Do we want to give an option to remove the header content? + packet_content = packet.data | packet.header + + for key, value in packet_content.items(): + val = value.raw_value + if use_derived_value: + # Use the derived value if it exists, otherwise use the raw value + val = value.derived_value or val + data_dict[apid][key].append(val) + + dataset_by_apid = {} + # Convert each apid's data to an xarray dataset + for apid, data in data_dict.items(): + # The time key is always the first key in the data dictionary on IMAP + time_key = next(iter(data.keys())) + # Convert to J2000 time and use that as our primary dimension + time_data = met_to_j2000ns(data[time_key]) + ds = xr.Dataset( + {key.lower(): ("epoch", val) for key, val in data.items()}, + coords={"epoch": time_data}, + ) + ds = ds.sortby("epoch") - return dataset + dataset_by_apid[apid] = ds + + return dataset_by_apid