-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement packet_file_to_datasets function for CoDICE #804
Changes from all commits
0f22478
caf55ec
bb9cb30
6f81bef
7ae84a4
dc2602c
953c10c
69814da
64467be
4e9252f
c7a8d09
9bfb816
c425910
a21727c
38704da
61c8233
5126d0d
6e46d18
907439a
10ba2fa
187d3d0
c55c8e0
0a19ca2
ba75ec4
940348a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,23 +13,19 @@ | |
|
||
from __future__ import annotations | ||
|
||
import collections | ||
import logging | ||
from pathlib import Path | ||
|
||
import numpy as np | ||
import pandas as pd | ||
import space_packet_parser | ||
import xarray as xr | ||
|
||
from imap_processing import imap_module_directory | ||
from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes | ||
from imap_processing.cdf.utils import met_to_j2000ns | ||
from imap_processing.codice import constants | ||
from imap_processing.codice.codice_l0 import decom_packets | ||
from imap_processing.codice.decompress import decompress | ||
from imap_processing.codice.utils import CODICEAPID, add_metadata_to_array | ||
from imap_processing.utils import group_by_apid, sort_by_time | ||
from imap_processing.codice.utils import CODICEAPID | ||
from imap_processing.utils import packet_file_to_datasets | ||
|
||
logger = logging.getLogger(__name__) | ||
logger.setLevel(logging.INFO) | ||
|
@@ -39,7 +35,6 @@ | |
# TODO: Add support for decomming multiple APIDs from a single file | ||
# TODO: Add these as variables in CDF: SPIN_PERIOD, ST_BIAS_GAIN_MODE, | ||
# SW_BIAS_GAIN_MODE, RGFO_HALF_SPIN, NSO_HALF_SPIN, DATA_QUALITY | ||
# TODO: Use new packet_file_to_dataset() function to simplify things | ||
# TODO: Determine what should go in event data CDF and how it should be | ||
# structured. | ||
# TODO: Make sure CDF attributes match expected nomenclature | ||
|
@@ -104,16 +99,18 @@ def configure_data_products(self, apid: int) -> None: | |
self.dataset_name = config["dataset_name"] | ||
self.instrument = config["instrument"] | ||
|
||
def create_science_dataset(self, met: np.int64, data_version: str) -> xr.Dataset: | ||
def create_science_dataset( | ||
self, packet: xr.Dataset, data_version: str | ||
) -> xr.Dataset: | ||
""" | ||
Create an ``xarray`` dataset for the unpacked science data. | ||
|
||
The dataset can then be written to a CDF file. | ||
|
||
Parameters | ||
---------- | ||
met : numpy.int64 | ||
The mission elapsed time of the packet, used to determine epoch data. | ||
packet : xarray.Dataset | ||
The packet to process. | ||
data_version : str | ||
Version of the data product being created. | ||
|
||
|
@@ -130,7 +127,7 @@ def create_science_dataset(self, met: np.int64, data_version: str) -> xr.Dataset | |
|
||
# Define coordinates | ||
epoch = xr.DataArray( | ||
[met_to_j2000ns(met)], | ||
packet.epoch, | ||
name="epoch", | ||
dims=["epoch"], | ||
attrs=cdf_attrs.get_variable_attributes("epoch"), | ||
|
@@ -179,12 +176,22 @@ def create_science_dataset(self, met: np.int64, data_version: str) -> xr.Dataset | |
# Data arrays are structured depending on the instrument | ||
if self.instrument == "lo": | ||
variable_data_arr = np.array(variable_data).reshape( | ||
1, self.num_positions, self.num_spin_sectors, self.num_energy_steps | ||
( | ||
1, | ||
self.num_positions, | ||
self.num_spin_sectors, | ||
self.num_energy_steps, | ||
) | ||
) | ||
dims = ["epoch", "inst_az", "spin_sector", "energy"] | ||
elif self.instrument == "hi": | ||
variable_data_arr = np.array(variable_data).reshape( | ||
1, self.num_energy_steps, self.num_positions, self.num_spin_sectors | ||
( | ||
1, | ||
self.num_energy_steps, | ||
self.num_positions, | ||
self.num_spin_sectors, | ||
) | ||
) | ||
dims = ["epoch", "energy", "inst_az", "spin_sector"] | ||
|
||
|
@@ -309,49 +316,49 @@ def unpack_science_data(self, science_values: str) -> None: | |
science_values : str | ||
A string of binary data representing the science values of the data. | ||
""" | ||
self.compression_algorithm = constants.LO_COMPRESSION_ID_LOOKUP[self.view_id] | ||
compression_algorithm = constants.LO_COMPRESSION_ID_LOOKUP[self.view_id] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I originally wrote this, I thought that I might be using the |
||
|
||
# Decompress the binary string into a list of integers | ||
science_values_decompressed = decompress( | ||
science_values, self.compression_algorithm | ||
) | ||
science_values_decompressed = decompress(science_values, compression_algorithm) | ||
|
||
# Re-arrange the counter data | ||
# For CoDICE-lo, data are a 3D arrays with a shape representing | ||
# [<num_positions>,<num_spin_sectors>,<num_energy_steps>] | ||
if self.instrument == "lo": | ||
self.data = np.array(science_values_decompressed, dtype=np.uint32).reshape( | ||
self.num_counters, | ||
self.num_positions, | ||
self.num_spin_sectors, | ||
self.num_energy_steps, | ||
( | ||
self.num_counters, | ||
self.num_positions, | ||
self.num_spin_sectors, | ||
self.num_energy_steps, | ||
) | ||
) | ||
|
||
# For CoDICE-hi, data are a 3D array with a shape representing | ||
# [<num_energy_steps>,<num_positions>,<num_spin_sectors>] | ||
elif self.instrument == "hi": | ||
self.data = np.array(science_values_decompressed, dtype=np.uint32).reshape( | ||
self.num_counters, | ||
self.num_energy_steps, | ||
self.num_positions, | ||
self.num_spin_sectors, | ||
( | ||
self.num_counters, | ||
self.num_energy_steps, | ||
self.num_positions, | ||
self.num_spin_sectors, | ||
) | ||
) | ||
|
||
|
||
def create_event_dataset( | ||
met: list[int], event_data: str, dataset_name: str, data_version: str | ||
apid: int, packet: xr.Dataset, data_version: str | ||
) -> xr.Dataset: | ||
""" | ||
Create dataset for event data. | ||
|
||
Parameters | ||
---------- | ||
met : list[int] | ||
The Mission Elapsed Time of the data. | ||
event_data : str | ||
A string of binary numbers representing the event data. | ||
dataset_name : str | ||
The name for the dataset. | ||
apid : int | ||
The APID of the packet. | ||
packet : xarray.Dataset | ||
The packet to process. | ||
Comment on lines
+360
to
+361
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. full_dataset ? I think this is the entire packet file you are passing in and not an individual packet. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No I think this is just an individual packet? Here is how it is called: packets = packet_file_to_datasets(file_path, xtce_packet_definition)
for apid in packets:
packet = packets[apid]
logger.info(f"\nProcessing {CODICEAPID(apid).name} packet")
if apid == CODICEAPID.COD_NHK:
dataset = create_hskp_dataset(packet, data_version)
elif apid in [CODICEAPID.COD_LO_PHA, CODICEAPID.COD_HI_PHA]:
dataset = create_event_dataset(apid, packet, data_version) |
||
data_version : str | ||
Version of the data product being created. | ||
|
||
|
@@ -360,14 +367,22 @@ def create_event_dataset( | |
dataset : xarray.Dataset | ||
Xarray dataset containing the event data. | ||
""" | ||
if apid == CODICEAPID.COD_LO_PHA: | ||
dataset_name = "imap_codice_l1a_lo_pha" | ||
elif apid == CODICEAPID.COD_HI_PHA: | ||
dataset_name = "imap_codice_l1a_hi_pha" | ||
|
||
# Extract the data | ||
# event_data = packet.event_data.data (Currently turned off, see TODO) | ||
|
||
cdf_attrs = ImapCdfAttributes() | ||
cdf_attrs.add_instrument_global_attrs("codice") | ||
cdf_attrs.add_instrument_variable_attrs("codice", "l1a") | ||
cdf_attrs.add_global_attribute("Data_version", data_version) | ||
|
||
# Define coordinates | ||
epoch = xr.DataArray( | ||
met_to_j2000ns(met), # TODO: Fix after SIT-3 (see note below) | ||
packet.epoch, | ||
name="epoch", | ||
dims=["epoch"], | ||
attrs=cdf_attrs.get_variable_attributes("epoch"), | ||
|
@@ -385,16 +400,16 @@ def create_event_dataset( | |
|
||
|
||
def create_hskp_dataset( | ||
packets: list[space_packet_parser.parser.Packet], | ||
packet: xr.Dataset, | ||
data_version: str, | ||
) -> xr.Dataset: | ||
""" | ||
Create dataset for each metadata field for housekeeping data. | ||
|
||
Parameters | ||
---------- | ||
packets : list[space_packet_parser.parser.Packet] | ||
The list of packets to process. | ||
packet : xarray.Dataset | ||
The packet to process. | ||
data_version : str | ||
Version of the data product being created. | ||
|
||
|
@@ -408,16 +423,8 @@ def create_hskp_dataset( | |
cdf_attrs.add_instrument_variable_attrs("codice", "l1a") | ||
cdf_attrs.add_global_attribute("Data_version", data_version) | ||
|
||
metadata_arrays: dict = collections.defaultdict(list) | ||
|
||
for packet in packets: | ||
add_metadata_to_array(packet, metadata_arrays) | ||
|
||
epoch = xr.DataArray( | ||
met_to_j2000ns( | ||
metadata_arrays["SHCOARSE"], | ||
reference_epoch=np.datetime64("2010-01-01T00:01:06.184", "ns"), | ||
), | ||
packet.epoch, | ||
name="epoch", | ||
dims=["epoch"], | ||
attrs=cdf_attrs.get_variable_attributes("epoch"), | ||
|
@@ -438,19 +445,21 @@ def create_hskp_dataset( | |
# I am holding off making this change until I acquire updated | ||
# housekeeping packets/validation data that match the latest telemetry | ||
# definitions | ||
for key, value in metadata_arrays.items(): | ||
for variable in packet: | ||
bourque marked this conversation as resolved.
Show resolved
Hide resolved
|
||
attrs = cdf_attrs.get_variable_attributes("codice_support_attrs") | ||
attrs["CATDESC"] = "TBD" | ||
attrs["DEPEND_0"] = "epoch" | ||
attrs["FIELDNAM"] = "TBD" | ||
attrs["LABLAXIS"] = key | ||
attrs["LABLAXIS"] = variable | ||
|
||
dataset[key] = xr.DataArray(value, dims=["epoch"], attrs=attrs) | ||
dataset[variable] = xr.DataArray( | ||
packet[variable].data, dims=["epoch"], attrs=attrs | ||
) | ||
|
||
return dataset | ||
|
||
|
||
def get_params(packet: space_packet_parser.parser.Packet) -> tuple[int, int, int, int]: | ||
def get_params(packet: xr.Dataset) -> tuple[int, int, int, int]: | ||
""" | ||
Return the four 'main' parameters used for l1a processing. | ||
|
||
|
@@ -460,7 +469,7 @@ def get_params(packet: space_packet_parser.parser.Packet) -> tuple[int, int, int | |
|
||
Parameters | ||
---------- | ||
packet : space_packet_parser.parser.Packet | ||
packet : xarray.Dataset | ||
A packet for the APID of interest. | ||
|
||
Returns | ||
|
@@ -479,10 +488,10 @@ def get_params(packet: space_packet_parser.parser.Packet) -> tuple[int, int, int | |
view_id : int | ||
Provides information about how data was collapsed and/or compressed. | ||
""" | ||
table_id = packet.data["TABLE_ID"].raw_value | ||
plan_id = packet.data["PLAN_ID"].raw_value | ||
plan_step = packet.data["PLAN_STEP"].raw_value | ||
view_id = packet.data["VIEW_ID"].raw_value | ||
table_id = int(packet.table_id.data) | ||
plan_id = int(packet.plan_id.data) | ||
plan_step = int(packet.plan_step.data) | ||
view_id = int(packet.view_id.data) | ||
|
||
return table_id, plan_id, plan_step, view_id | ||
|
||
|
@@ -504,54 +513,33 @@ def process_codice_l1a(file_path: Path, data_version: str) -> xr.Dataset: | |
The ``xarray`` dataset containing the science data and supporting metadata. | ||
""" | ||
# Decom the packets, group data by APID, and sort by time | ||
packets = decom_packets(file_path) | ||
grouped_data = group_by_apid(packets) | ||
xtce_packet_definition = Path( | ||
f"{imap_module_directory}/codice/packet_definitions/{constants.PACKET_TO_XTCE_MAPPING[file_path.name]}" | ||
) | ||
bourque marked this conversation as resolved.
Show resolved
Hide resolved
|
||
packets = packet_file_to_datasets(file_path, xtce_packet_definition) | ||
|
||
for apid in grouped_data: | ||
for apid in packets: | ||
packet = packets[apid] | ||
logger.info(f"\nProcessing {CODICEAPID(apid).name} packet") | ||
|
||
if apid == CODICEAPID.COD_NHK: | ||
packets = grouped_data[apid] | ||
sorted_packets = sort_by_time(packets, "SHCOARSE") | ||
dataset = create_hskp_dataset(sorted_packets, data_version) | ||
dataset = create_hskp_dataset(packet, data_version) | ||
|
||
elif apid in [CODICEAPID.COD_LO_PHA, CODICEAPID.COD_HI_PHA]: | ||
if apid == CODICEAPID.COD_LO_PHA: | ||
dataset_name = "imap_codice_l1a_lo_pha" | ||
elif apid == CODICEAPID.COD_HI_PHA: | ||
dataset_name = "imap_codice_l1a_hi_pha" | ||
|
||
# Sort the packets by time | ||
packets = sort_by_time(grouped_data[apid], "SHCOARSE") | ||
|
||
# Determine the start time of the packet | ||
met = packets[0].data["ACQ_START_SECONDS"].raw_value | ||
met = [met, met + 1] # TODO: Remove after cdflib fix | ||
|
||
# Extract the data | ||
event_data = packets[0].data["EVENT_DATA"].raw_value | ||
|
||
# Create the dataset | ||
dataset = create_event_dataset(met, event_data, dataset_name, data_version) | ||
dataset = create_event_dataset(apid, packet, data_version) | ||
|
||
elif apid in constants.APIDS_FOR_SCIENCE_PROCESSING: | ||
# Sort the packets by time | ||
packets = sort_by_time(grouped_data[apid], "SHCOARSE") | ||
|
||
# Determine the start time of the packet | ||
met = packets[0].data["ACQ_START_SECONDS"].raw_value | ||
|
||
# Extract the data | ||
science_values = packets[0].data["DATA"].raw_value | ||
science_values = packet.data.data[0] | ||
|
||
# Get the four "main" parameters for processing | ||
table_id, plan_id, plan_step, view_id = get_params(packets[0]) | ||
table_id, plan_id, plan_step, view_id = get_params(packet) | ||
|
||
# Run the pipeline to create a dataset for the product | ||
pipeline = CoDICEL1aPipeline(table_id, plan_id, plan_step, view_id) | ||
pipeline.configure_data_products(apid) | ||
pipeline.unpack_science_data(science_values) | ||
dataset = pipeline.create_science_dataset(met, data_version) | ||
dataset = pipeline.create_science_dataset(packet, data_version) | ||
|
||
logger.info(f"\nFinal data product:\n{dataset}\n") | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is to avoid 'unexpected argument' warnings from
np.reshape()