Skip to content

Commit

Permalink
Refactor hit_l1a.py to use packet_file_to_datasets function (#828)
Browse files Browse the repository at this point in the history
* WIP - updating file to use packet_to_datasets function in utils

* Add main for quick testing to view input data

* Replace packet definition files with updated file covering all HIT apids

* Finalize housekeeping l1a updates and set up start for science data

-Add function to handle concatenating leak_i variables.
-Drop variables from housekeeping dataset that aren't needed
 for the CDF product.
-Update dimensions and add attributes to the housekeeping Dataset.
-Delete create_datasets function since packet_file_to_datasets.
 creates xarray datasets and those just need to be updated. This
 will happen in housekeeping and science data processing functions.
-Add function to process science data (WIP).
-Clean up code and add/update docstrings and comments.

* Minor updates to docstring and comment

* Update unit tests for refactored hit_l1a.py.

hit_l1a.py was refactored to use the packet_file_to_datasets
function. The unit tests were updated to reflect changes.

-Added new fixtures for attributes manager, datasets dict, and
 housekeeping dataset.
-Added new tests for new functions (concatenating leak_i data
 and processing housekeeping).
-Added additional assertions for housekeeping dataset.

* Remove main from hit_l1a.py

* Delete test for housekeeping data class. Need to delete the data classes, but only after hit_l1b is refactored not to use it

* Address PR comments/suggestions

* Assign attrs and dims directly to data arrays rather than re-create the data arrays

* Add adc_channels as a parameter to the function that concatenates the leak_i variables to assign as a dimension. Also change dims to a dict from a list since assign_coords takes in a dictionary

* Update test for concatenating leak_i variables to take in new parameter. Also add assertion to check values are correct

* Add attributes to epoch variable in dataset. Update test data file name
  • Loading branch information
vmartinez-cu authored Sep 24, 2024
1 parent 6750acf commit ba5ad0b
Show file tree
Hide file tree
Showing 6 changed files with 362 additions and 668 deletions.
324 changes: 163 additions & 161 deletions imap_processing/hit/l1a/hit_l1a.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
"""Decommutate HIT CCSDS data and create L1a data products."""

import logging
from collections import defaultdict
from dataclasses import fields
from enum import IntEnum

import numpy as np
import xarray as xr

from imap_processing import decom, imap_module_directory, utils
from imap_processing import imap_module_directory
from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes
from imap_processing.hit.l0.data_classes.housekeeping import Housekeeping
from imap_processing.spice.time import met_to_j2000ns
from imap_processing.utils import packet_file_to_datasets

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,203 +47,208 @@ def hit_l1a(packet_file: str, data_version: str) -> list[xr.Dataset]:
Returns
-------
cdf_filepaths : list[xarray.Dataset]
processed_data : list[xarray.Dataset]
List of Datasets of L1A processed data.
"""
# Decom, sort, and group packets by apid
packets = decom_packets(packet_file)
sorted_packets = utils.sort_by_time(packets, "SHCOARSE")
grouped_data = group_data(sorted_packets)

# create the attribute manager for this data level
# TODO add logging

# Unpack ccsds file
packet_definition = (
imap_module_directory / "hit/packet_definitions/hit_packet_definitions.xml"
)
datasets_by_apid = packet_file_to_datasets(
packet_file=packet_file,
xtce_packet_definition=packet_definition,
use_derived_value=False,
)

# Create the attribute manager for this data level
attr_mgr = ImapCdfAttributes()
attr_mgr.add_instrument_global_attrs(instrument="hit")
attr_mgr.add_instrument_variable_attrs(instrument="hit", level="l1a")
attr_mgr.add_global_attribute("Data_version", data_version)

# Create datasets
datasets = create_datasets(grouped_data, attr_mgr)
# Process science to l1a.
if HitAPID.HIT_HSKP in datasets_by_apid:
datasets_by_apid[HitAPID.HIT_HSKP] = process_housekeeping(
datasets_by_apid[HitAPID.HIT_HSKP], attr_mgr
)
if HitAPID.HIT_SCIENCE in datasets_by_apid:
# TODO complete science data processing
print("Skipping science data for now")
datasets_by_apid[HitAPID.HIT_SCIENCE] = process_science(
datasets_by_apid[HitAPID.HIT_SCIENCE], attr_mgr
)

return list(datasets.values())
return list(datasets_by_apid.values())


def decom_packets(packet_file: str) -> list:
def concatenate_leak_variables(
dataset: xr.Dataset, adc_channels: xr.DataArray
) -> xr.Dataset:
"""
Unpack and decode packets using CCSDS file and XTCE packet definitions.
Concatenate leak variables in the dataset.
Updates the housekeeping dataset to replace the individual
leak_i_00, leak_i_01, ..., leak_i_63 variables with a single
leak_i variable as a 2D array. "i" here represents current
in the leakage current [Voltage] data.
Parameters
----------
packet_file : str
Path to the CCSDS data packet file.
dataset : xarray.Dataset
Dataset containing 64 leak variables.
adc_channels : xarray.DataArray
DataArray to be used as a dimension for the concatenated leak variables.
Returns
-------
unpacked_packets : list
List of all the unpacked data.
dataset : xarray.Dataset
Updated dataset with concatenated leak variables.
"""
# TODO: update path to use a combined packets xtce file
xtce_file = imap_module_directory / "hit/packet_definitions/P_HIT_HSKP.xml"
logger.debug(f"Unpacking {packet_file} using xtce definitions in {xtce_file}")
unpacked_packets: list = decom.decom_packets(packet_file, xtce_file)
logger.debug(f"{packet_file} unpacked")
return unpacked_packets
# Stack 64 leak variables (leak_00, leak_01, ..., leak_63)
leak_vars = [dataset[f"leak_i_{i:02d}"] for i in range(64)]

# Concatenate along 'adc_channels' and reorder dimensions
stacked_leaks = xr.concat(leak_vars, dim=adc_channels).transpose(
"epoch", "adc_channels"
)
dataset["leak_i"] = stacked_leaks

# Drop the individual leak variables
updated_dataset = dataset.drop_vars([f"leak_i_{i:02d}" for i in range(64)])

def group_data(unpacked_data: list) -> dict:
return updated_dataset


def process_science(dataset: xr.Dataset, attr_mgr: ImapCdfAttributes) -> xr.Dataset:
"""
Group data by apid.
Will process science dataset for CDF product.
Process binary science data for CDF creation. The data is
grouped into science frames, decommutated and decompressed,
and split into count rates and event datasets. Updates the
dataset attributes and coordinates and data variable
dimensions according to specifications in a cdf yaml file.
Parameters
----------
unpacked_data : list
Packet list.
dataset : xarray.Dataset
Dataset containing HIT science data.
attr_mgr : ImapCdfAttributes
Attribute manager used to get the data product field's attributes.
Returns
-------
grouped_data : dict
Grouped data by apid.
dataset : xarray.Dataset
An updated dataset ready for CDF conversion.
"""
logger.debug("Grouping packet values for each apid")
grouped_data: dict = utils.group_by_apid(unpacked_data)
logger.info("Creating HIT L1A science datasets")

# Create data classes for each packet
for apid in grouped_data:
if apid == HitAPID.HIT_HSKP:
logger.debug(f"Grouping housekeeping packets - APID: {apid}")
grouped_data[apid] = [
Housekeeping(packet, "0.0", "hskp_sample.ccsds")
for packet in grouped_data[apid]
]
else:
raise RuntimeError(f"Encountered unexpected APID [{apid}]")
# Logical sources for the two products.
# logical_sources = ["imap_hit_l1a_sci-counts", "imap_hit_l1a_pulse-height-event"]

logger.debug("Finished grouping packet data")
return grouped_data
# TODO: Complete this function
# - call decom_hit.py to decommutate the science data
# - split the science data into count rates and event datasets
# - update dimensions and add attributes to the dataset and data arrays
# - return list of two datasets (count rates and events)?

# logger.info("HIT L1A event dataset created")
# logger.info("HIT L1A count rates dataset created")

def create_datasets(data: dict, attr_mgr: ImapCdfAttributes) -> dict:
return dataset


def process_housekeeping(
dataset: xr.Dataset, attr_mgr: ImapCdfAttributes
) -> xr.Dataset:
"""
Create a dataset for each APID in the data.
Will process housekeeping dataset for CDF product.
Updates the housekeeping dataset to replace with a single
leak_i variable as a 2D array. Also updates the dataset
attributes and coordinates and data variable dimensions
according to specifications in a cdf yaml file.
Parameters
----------
data : dict
A single dictionary containing data for all instances of an APID.
dataset : xarray.Dataset
Dataset containing HIT housekeeping data.
attr_mgr : ImapCdfAttributes
Attribute manager used to get the data product field's attributes.
Returns
-------
processed_data : dict
A dictionary containing xarray.Dataset for each APID. Each dataset in the
dictionary will be converted to a CDF.
dataset : xarray.Dataset
An updated dataset ready for CDF conversion.
"""
logger.info("Creating datasets for HIT L1A data")

skip_keys = [
"shcoarse",
"ground_sw_version",
"packet_file_name",
"ccsds_header",
"leak_i_raw",
logger.info("Creating HIT L1A housekeeping dataset")

logical_source = "imap_hit_l1a_hk"

# Drop keys that are not CDF data variables
drop_keys = [
"pkt_apid",
"sc_tick",
"version",
"type",
"sec_hdr_flg",
"seq_flgs",
"src_seq_ctr",
"pkt_len",
"hskp_spare1",
"hskp_spare2",
"hskp_spare3",
"hskp_spare4",
"hskp_spare5",
]

processed_data = {}
for apid, data_packets in data.items():
if apid == HitAPID.HIT_HSKP:
logical_source = "imap_hit_l1a_hk"
# TODO define keys to skip for each apid. Currently just have
# a list for housekeeping. Some of these may change later.
# leak_i_raw can be handled in the housekeeping class as an
# InitVar so that it doesn't show up when you extract the object's
# field names.
elif apid == HitAPID.HIT_SCIENCE:
logical_source = "imap_hit_l1a_sci-counts"
# TODO what about pulse height? It has the same apid.
# Will need to approach this differently
else:
raise Exception(f"Unknown APID [{apid}]")
metadata_arrays = defaultdict(list)
for packet in data_packets:
# Add metadata to an array
for field in fields(packet):
field_name = field.name
field_value = getattr(packet, field_name)
# convert key to lower case to match SPDF requirement
data_key = field_name.lower()
metadata_arrays[data_key].append(field_value)

# Convert integers into datetime64[s]
epoch_converted_times = met_to_j2000ns(metadata_arrays["shcoarse"])

# Create xarray data arrays for dependencies
epoch_time = xr.DataArray(
epoch_converted_times,
name="epoch",
dims=["epoch"],
attrs=attr_mgr.get_variable_attributes("epoch"),
)

adc_channels = xr.DataArray(
np.arange(64, dtype=np.uint16),
name="adc_channels",
dims=["adc_channels"],
attrs=attr_mgr.get_variable_attributes("adc_channels"),
)

# NOTE: LABL_PTR_1 should be CDF_CHAR.
adc_channels_label = xr.DataArray(
adc_channels.values.astype(str),
name="adc_channels_label",
dims=["adc_channels_label"],
attrs=attr_mgr.get_variable_attributes("adc_channels_label"),
)

# Create xarray dataset
dataset = xr.Dataset(
coords={
"epoch": epoch_time,
"adc_channels": adc_channels,
"adc_channels_label": adc_channels_label,
},
attrs=attr_mgr.get_global_attributes(logical_source),
)

# Create xarray data array for each metadata field
for field, data in metadata_arrays.items(): # type: ignore[assignment]
# TODO Error, Incompatible types in assignment
# (expression has type "str", variable has type "Field[Any]")
# AND
# Incompatible types in assignment
# (expression has type "list[Any]", variable has type "dict[Any, Any]")
if field not in skip_keys: # type: ignore[comparison-overlap]
# TODO Error, Non-overlapping container check
# (element type: "Field[Any]", container item type: "str")

# Create a list of all the dimensions using the DEPEND_I keys in the
# attributes
dims = [
value
for key, value in attr_mgr.get_variable_attributes(field).items() # type: ignore[arg-type]
if "DEPEND" in key
]
if field == "leak_i": # type: ignore[comparison-overlap]
# TODO Error, Non-overlapping equality check
# (left operand type: "Field[Any]",
# right operand type: "Literal['leak_i']")

# 2D array - needs two dims
dataset[field] = xr.DataArray(
data,
dims=dims,
attrs=attr_mgr.get_variable_attributes(field), # type: ignore[arg-type]
)
else:
dataset[field] = xr.DataArray(
data,
dims=dims,
attrs=attr_mgr.get_variable_attributes(field), # type: ignore[arg-type]
)
processed_data[apid] = dataset
logger.info("HIT L1A datasets created")
return processed_data
# Drop variables not needed for CDF
dataset = dataset.drop_vars(drop_keys)

# Create data arrays for dependencies
adc_channels = xr.DataArray(
np.arange(64, dtype=np.uint8),
name="adc_channels",
dims=["adc_channels"],
attrs=attr_mgr.get_variable_attributes("adc_channels"),
)

# NOTE: LABL_PTR_1 should be CDF_CHAR.
adc_channels_label = xr.DataArray(
adc_channels.values.astype(str),
name="adc_channels_label",
dims=["adc_channels_label"],
attrs=attr_mgr.get_variable_attributes("adc_channels_label"),
)

# Update dataset coordinates and attributes
dataset = dataset.assign_coords(
{
"adc_channels": adc_channels,
"adc_channels_label": adc_channels_label,
}
)
dataset.attrs = attr_mgr.get_global_attributes(logical_source)

# Stack 64 leak variables (leak_00, leak_01, ..., leak_63)
dataset = concatenate_leak_variables(dataset, adc_channels)

# Assign attributes and dimensions to each data array in the Dataset
for field in dataset.data_vars.keys():
# Create a dict of dimensions using the DEPEND_I keys in the
# attributes
dims = {
key: value
for key, value in attr_mgr.get_variable_attributes(field).items()
if "DEPEND" in key
}
dataset[field].attrs = attr_mgr.get_variable_attributes(field)
dataset[field].assign_coords(dims)

dataset.epoch.attrs = attr_mgr.get_variable_attributes("epoch")

return dataset
Loading

0 comments on commit ba5ad0b

Please sign in to comment.