Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor hit_l1a.py to use packet_file_to_datasets function #828

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
324 changes: 163 additions & 161 deletions imap_processing/hit/l1a/hit_l1a.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
"""Decommutate HIT CCSDS data and create L1a data products."""

import logging
from collections import defaultdict
from dataclasses import fields
from enum import IntEnum

import numpy as np
import xarray as xr

from imap_processing import decom, imap_module_directory, utils
from imap_processing import imap_module_directory
from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes
from imap_processing.hit.l0.data_classes.housekeeping import Housekeeping
from imap_processing.spice.time import met_to_j2000ns
from imap_processing.utils import packet_file_to_datasets

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,203 +47,208 @@ def hit_l1a(packet_file: str, data_version: str) -> list[xr.Dataset]:
Returns
-------
cdf_filepaths : list[xarray.Dataset]
processed_data : list[xarray.Dataset]
List of Datasets of L1A processed data.
"""
# Decom, sort, and group packets by apid
packets = decom_packets(packet_file)
sorted_packets = utils.sort_by_time(packets, "SHCOARSE")
grouped_data = group_data(sorted_packets)

# create the attribute manager for this data level
# TODO add logging

# Unpack ccsds file
packet_definition = (
imap_module_directory / "hit/packet_definitions/hit_packet_definitions.xml"
)
datasets_by_apid = packet_file_to_datasets(
vmartinez-cu marked this conversation as resolved.
Show resolved Hide resolved
packet_file=packet_file,
xtce_packet_definition=packet_definition,
use_derived_value=False,
)

# Create the attribute manager for this data level
attr_mgr = ImapCdfAttributes()
attr_mgr.add_instrument_global_attrs(instrument="hit")
attr_mgr.add_instrument_variable_attrs(instrument="hit", level="l1a")
attr_mgr.add_global_attribute("Data_version", data_version)

# Create datasets
datasets = create_datasets(grouped_data, attr_mgr)
# Process science to l1a.
if HitAPID.HIT_HSKP in datasets_by_apid:
datasets_by_apid[HitAPID.HIT_HSKP] = process_housekeeping(
datasets_by_apid[HitAPID.HIT_HSKP], attr_mgr
)
if HitAPID.HIT_SCIENCE in datasets_by_apid:
# TODO complete science data processing
print("Skipping science data for now")
datasets_by_apid[HitAPID.HIT_SCIENCE] = process_science(
datasets_by_apid[HitAPID.HIT_SCIENCE], attr_mgr
)

return list(datasets.values())
return list(datasets_by_apid.values())


def decom_packets(packet_file: str) -> list:
def concatenate_leak_variables(
dataset: xr.Dataset, adc_channels: xr.DataArray
) -> xr.Dataset:
"""
Unpack and decode packets using CCSDS file and XTCE packet definitions.
Concatenate leak variables in the dataset.
Updates the housekeeping dataset to replace the individual
leak_i_00, leak_i_01, ..., leak_i_63 variables with a single
leak_i variable as a 2D array. "i" here represents current
in the leakage current [Voltage] data.
Parameters
----------
packet_file : str
Path to the CCSDS data packet file.
dataset : xarray.Dataset
Dataset containing 64 leak variables.
adc_channels : xarray.DataArray
DataArray to be used as a dimension for the concatenated leak variables.
Returns
-------
unpacked_packets : list
List of all the unpacked data.
dataset : xarray.Dataset
Updated dataset with concatenated leak variables.
"""
# TODO: update path to use a combined packets xtce file
xtce_file = imap_module_directory / "hit/packet_definitions/P_HIT_HSKP.xml"
logger.debug(f"Unpacking {packet_file} using xtce definitions in {xtce_file}")
unpacked_packets: list = decom.decom_packets(packet_file, xtce_file)
logger.debug(f"{packet_file} unpacked")
return unpacked_packets
# Stack 64 leak variables (leak_00, leak_01, ..., leak_63)
leak_vars = [dataset[f"leak_i_{i:02d}"] for i in range(64)]

# Concatenate along 'adc_channels' and reorder dimensions
stacked_leaks = xr.concat(leak_vars, dim=adc_channels).transpose(
"epoch", "adc_channels"
)
dataset["leak_i"] = stacked_leaks
greglucas marked this conversation as resolved.
Show resolved Hide resolved

# Drop the individual leak variables
updated_dataset = dataset.drop_vars([f"leak_i_{i:02d}" for i in range(64)])

def group_data(unpacked_data: list) -> dict:
return updated_dataset


def process_science(dataset: xr.Dataset, attr_mgr: ImapCdfAttributes) -> xr.Dataset:
"""
Group data by apid.
Will process science dataset for CDF product.
Process binary science data for CDF creation. The data is
grouped into science frames, decommutated and decompressed,
and split into count rates and event datasets. Updates the
dataset attributes and coordinates and data variable
dimensions according to specifications in a cdf yaml file.
Parameters
----------
unpacked_data : list
Packet list.
dataset : xarray.Dataset
Dataset containing HIT science data.
attr_mgr : ImapCdfAttributes
Attribute manager used to get the data product field's attributes.
Returns
-------
grouped_data : dict
Grouped data by apid.
dataset : xarray.Dataset
An updated dataset ready for CDF conversion.
"""
logger.debug("Grouping packet values for each apid")
grouped_data: dict = utils.group_by_apid(unpacked_data)
logger.info("Creating HIT L1A science datasets")

# Create data classes for each packet
for apid in grouped_data:
if apid == HitAPID.HIT_HSKP:
logger.debug(f"Grouping housekeeping packets - APID: {apid}")
grouped_data[apid] = [
Housekeeping(packet, "0.0", "hskp_sample.ccsds")
for packet in grouped_data[apid]
]
else:
raise RuntimeError(f"Encountered unexpected APID [{apid}]")
# Logical sources for the two products.
# logical_sources = ["imap_hit_l1a_sci-counts", "imap_hit_l1a_pulse-height-event"]

logger.debug("Finished grouping packet data")
return grouped_data
# TODO: Complete this function
# - call decom_hit.py to decommutate the science data
# - split the science data into count rates and event datasets
# - update dimensions and add attributes to the dataset and data arrays
# - return list of two datasets (count rates and events)?

# logger.info("HIT L1A event dataset created")
# logger.info("HIT L1A count rates dataset created")

def create_datasets(data: dict, attr_mgr: ImapCdfAttributes) -> dict:
return dataset


def process_housekeeping(
dataset: xr.Dataset, attr_mgr: ImapCdfAttributes
) -> xr.Dataset:
"""
Create a dataset for each APID in the data.
Will process housekeeping dataset for CDF product.
Updates the housekeeping dataset to replace with a single
leak_i variable as a 2D array. Also updates the dataset
attributes and coordinates and data variable dimensions
according to specifications in a cdf yaml file.
Parameters
----------
data : dict
A single dictionary containing data for all instances of an APID.
dataset : xarray.Dataset
Dataset containing HIT housekeeping data.
attr_mgr : ImapCdfAttributes
Attribute manager used to get the data product field's attributes.
Returns
-------
processed_data : dict
A dictionary containing xarray.Dataset for each APID. Each dataset in the
dictionary will be converted to a CDF.
dataset : xarray.Dataset
An updated dataset ready for CDF conversion.
"""
logger.info("Creating datasets for HIT L1A data")

skip_keys = [
"shcoarse",
"ground_sw_version",
"packet_file_name",
"ccsds_header",
"leak_i_raw",
logger.info("Creating HIT L1A housekeeping dataset")

logical_source = "imap_hit_l1a_hk"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be in the CDF metadata?

Copy link
Contributor Author

@vmartinez-cu vmartinez-cu Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logical source variable is used to grab the HIT L1A instrument level attributes from imap_hit_global_cdf_attrs.yaml when updating the dataset attributes dataset.attrs = attr_mgr.get_global_attributes(logical_source)

Attributes from imap_hit_global_cdf_attrs.yaml for L1A housekeeping:

imap_hit_l1a_hk:
    <<: *instrument_base
    Data_level: 1A
    Data_type: L1A_HK>Level-1A Housekeeping
    Logical_source: imap_hit_l1a_hk
    Logical_source_description: IMAP Mission HIT Instrument Level-1A Housekeeping Data.

Would it help to rename this variable or plug in the value directly into the line that updates the dataset attributes? dataset.attrs = attr_mgr.get_global_attributes(imap_hit_l1a_hk)


# Drop keys that are not CDF data variables
drop_keys = [
"pkt_apid",
"sc_tick",
"version",
"type",
"sec_hdr_flg",
"seq_flgs",
"src_seq_ctr",
"pkt_len",
"hskp_spare1",
"hskp_spare2",
"hskp_spare3",
"hskp_spare4",
"hskp_spare5",
]

processed_data = {}
for apid, data_packets in data.items():
if apid == HitAPID.HIT_HSKP:
logical_source = "imap_hit_l1a_hk"
# TODO define keys to skip for each apid. Currently just have
# a list for housekeeping. Some of these may change later.
# leak_i_raw can be handled in the housekeeping class as an
# InitVar so that it doesn't show up when you extract the object's
# field names.
elif apid == HitAPID.HIT_SCIENCE:
logical_source = "imap_hit_l1a_sci-counts"
# TODO what about pulse height? It has the same apid.
# Will need to approach this differently
else:
raise Exception(f"Unknown APID [{apid}]")
metadata_arrays = defaultdict(list)
for packet in data_packets:
# Add metadata to an array
for field in fields(packet):
field_name = field.name
field_value = getattr(packet, field_name)
# convert key to lower case to match SPDF requirement
data_key = field_name.lower()
metadata_arrays[data_key].append(field_value)

# Convert integers into datetime64[s]
epoch_converted_times = met_to_j2000ns(metadata_arrays["shcoarse"])

# Create xarray data arrays for dependencies
epoch_time = xr.DataArray(
epoch_converted_times,
name="epoch",
dims=["epoch"],
attrs=attr_mgr.get_variable_attributes("epoch"),
)

adc_channels = xr.DataArray(
np.arange(64, dtype=np.uint16),
name="adc_channels",
dims=["adc_channels"],
attrs=attr_mgr.get_variable_attributes("adc_channels"),
)

# NOTE: LABL_PTR_1 should be CDF_CHAR.
adc_channels_label = xr.DataArray(
adc_channels.values.astype(str),
name="adc_channels_label",
dims=["adc_channels_label"],
attrs=attr_mgr.get_variable_attributes("adc_channels_label"),
)

# Create xarray dataset
dataset = xr.Dataset(
coords={
"epoch": epoch_time,
"adc_channels": adc_channels,
"adc_channels_label": adc_channels_label,
},
attrs=attr_mgr.get_global_attributes(logical_source),
)

# Create xarray data array for each metadata field
for field, data in metadata_arrays.items(): # type: ignore[assignment]
# TODO Error, Incompatible types in assignment
# (expression has type "str", variable has type "Field[Any]")
# AND
# Incompatible types in assignment
# (expression has type "list[Any]", variable has type "dict[Any, Any]")
if field not in skip_keys: # type: ignore[comparison-overlap]
# TODO Error, Non-overlapping container check
# (element type: "Field[Any]", container item type: "str")

# Create a list of all the dimensions using the DEPEND_I keys in the
# attributes
dims = [
value
for key, value in attr_mgr.get_variable_attributes(field).items() # type: ignore[arg-type]
if "DEPEND" in key
]
if field == "leak_i": # type: ignore[comparison-overlap]
# TODO Error, Non-overlapping equality check
# (left operand type: "Field[Any]",
# right operand type: "Literal['leak_i']")

# 2D array - needs two dims
dataset[field] = xr.DataArray(
data,
dims=dims,
attrs=attr_mgr.get_variable_attributes(field), # type: ignore[arg-type]
)
else:
dataset[field] = xr.DataArray(
data,
dims=dims,
attrs=attr_mgr.get_variable_attributes(field), # type: ignore[arg-type]
)
processed_data[apid] = dataset
logger.info("HIT L1A datasets created")
return processed_data
# Drop variables not needed for CDF
dataset = dataset.drop_vars(drop_keys)

# Create data arrays for dependencies
adc_channels = xr.DataArray(
np.arange(64, dtype=np.uint8),
name="adc_channels",
dims=["adc_channels"],
attrs=attr_mgr.get_variable_attributes("adc_channels"),
)

# NOTE: LABL_PTR_1 should be CDF_CHAR.
adc_channels_label = xr.DataArray(
adc_channels.values.astype(str),
name="adc_channels_label",
dims=["adc_channels_label"],
attrs=attr_mgr.get_variable_attributes("adc_channels_label"),
)

# Update dataset coordinates and attributes
dataset = dataset.assign_coords(
{
"adc_channels": adc_channels,
"adc_channels_label": adc_channels_label,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forget, do the "labels" need to be defined as coordinates as well, or are those variables that are dependent on the coordinates?

i.e. should adc_channels_label actually be in the data_vars section?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it needs to be added to the coordinates as well because I initially didn't have it in there but then had to add it. I don't remember why though. I'll get clarification on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked with Tenzin and she explained that cdflib updates requires this for data with greater than 1 dimensions

}
)
dataset.attrs = attr_mgr.get_global_attributes(logical_source)

# Stack 64 leak variables (leak_00, leak_01, ..., leak_63)
dataset = concatenate_leak_variables(dataset, adc_channels)

# Assign attributes and dimensions to each data array in the Dataset
for field in dataset.data_vars.keys():
# Create a dict of dimensions using the DEPEND_I keys in the
# attributes
dims = {
key: value
for key, value in attr_mgr.get_variable_attributes(field).items()
if "DEPEND" in key
}
dataset[field].attrs = attr_mgr.get_variable_attributes(field)
dataset[field].assign_coords(dims)

dataset.epoch.attrs = attr_mgr.get_variable_attributes("epoch")

return dataset
Loading
Loading