Skip to content

Commit

Permalink
WIP/MNT: Update SWE to use packet_file_to_datasets
Browse files Browse the repository at this point in the history
This is switching SWE over to packet_file_to_datasets rather than
iterating through all of the packets individually.
  • Loading branch information
greglucas committed Aug 30, 2024
1 parent 373f9f3 commit c70e024
Show file tree
Hide file tree
Showing 13 changed files with 346 additions and 388 deletions.
9 changes: 0 additions & 9 deletions docs/source/code-documentation/swe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,6 @@ SWE
This is the SWE (Solar Wind Electrons) Instrument module, which contains the code for processing
data from the SWE instrument.

The L0 code to decommutate the CCSDS packet data can be found below.

.. autosummary::
:toctree: generated/
:template: autosummary.rst
:recursive:

l0.decom_swe

The L1A code to unpack electron counts can be found below.

.. autosummary::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ raw_counts:
FIELDNAM: Raw Counts
FORMAT: I3
UNITS: counts
VALIDMAX: 256
VALIDMAX: 255
VALIDMIN: 0
VAR_TYPE: data

Expand Down
Empty file removed imap_processing/swe/l0/__init__.py
Empty file.
25 changes: 0 additions & 25 deletions imap_processing/swe/l0/decom_swe.py

This file was deleted.

33 changes: 16 additions & 17 deletions imap_processing/swe/l1a/swe_l1a.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@

import xarray as xr

from imap_processing.swe.l0 import decom_swe
from imap_processing import imap_module_directory
from imap_processing.swe.l1a.swe_science import swe_science
from imap_processing.swe.utils.swe_utils import (
SWEAPID,
)
from imap_processing.utils import group_by_apid, sort_by_time
from imap_processing.utils import packet_file_to_datasets

logger = logging.getLogger(__name__)


def swe_l1a(file_path: str, data_version: str) -> xr.Dataset:
def swe_l1a(packet_file: str, data_version: str) -> xr.Dataset:
"""
Will process SWE l0 data into l1a data.
Expand All @@ -24,8 +24,8 @@ def swe_l1a(file_path: str, data_version: str) -> xr.Dataset:
Parameters
----------
file_path : str
Path where data is downloaded.
packet_file : str
Path where the raw packet file is stored.
data_version : str
Data version to write to CDF files and the Data_version CDF attribute.
Should be in the format Vxxx.
Expand All @@ -35,15 +35,14 @@ def swe_l1a(file_path: str, data_version: str) -> xr.Dataset:
List
List of xarray.Dataset.
"""
packets = decom_swe.decom_packets(file_path)

# group data by appId
grouped_data = group_by_apid(packets)

# TODO: figure out how to handle non-science data error
# Process science data packets
# sort data by acquisition time
sorted_packets = sort_by_time(grouped_data[SWEAPID.SWE_SCIENCE], "ACQ_START_COARSE")
logger.debug("Processing science data for [%s] packets", len(sorted_packets))

return swe_science(decom_data=sorted_packets, data_version=data_version)
xtce_document = (
f"{imap_module_directory}/swe/packet_definitions/swe_packet_definition.xml"
)
datasets_by_apid = packet_file_to_datasets(
packet_file, xtce_document, use_derived_value=False
)

# TODO: figure out how to handle non-science data
return swe_science(
l0_dataset=datasets_by_apid[SWEAPID.SWE_SCIENCE], data_version=data_version
)
94 changes: 43 additions & 51 deletions imap_processing/swe/l1a/swe_science.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
"""Contains code to perform SWE L1a science processing."""

import collections
import logging

import numpy as np
import xarray as xr

from imap_processing.cdf.imap_cdf_manager import ImapCdfAttributes
from imap_processing.cdf.utils import met_to_j2000ns
from imap_processing.swe.utils.swe_utils import (
add_metadata_to_array,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -67,7 +62,7 @@ def decompressed_counts(cem_count: int) -> int:
)


def swe_science(decom_data: list, data_version: str) -> xr.Dataset:
def swe_science(l0_dataset: xr.Dataset, data_version: str) -> xr.Dataset:
"""
SWE L1a science processing.
Expand Down Expand Up @@ -97,8 +92,8 @@ def swe_science(decom_data: list, data_version: str) -> xr.Dataset:
Parameters
----------
decom_data : list
Decompressed packet data.
l0_dataset : xarray.Dataset
Raw packet data from SWE stored as an xarray dataset.
data_version : str
Data version for the 'Data_version' CDF attribute. This is the version of the
Expand All @@ -109,49 +104,39 @@ def swe_science(decom_data: list, data_version: str) -> xr.Dataset:
dataset : xarray.Dataset
The xarray dataset with data.
"""
science_array = []
raw_science_array = []

metadata_arrays: dict[list] = collections.defaultdict(list)

# We know we can only have 8 bit numbers input, so iterate over all
# possibilities once up front
decompression_table = np.array([decompressed_counts(i) for i in range(256)])

for data_packet in decom_data:
# read raw data
binary_data = data_packet.data["SCIENCE_DATA"].raw_value
# read binary string to an int and then convert it to
# bytes. This is to convert the string to bytes.
# Eg. "0000000011110011" --> b'\x00\xf3'
# 1260 = 15 seconds x 12 energy steps x 7 CEMs
byte_data = int(binary_data, 2).to_bytes(1260, byteorder="big")
# convert bytes to numpy array of uint8
raw_counts = np.frombuffer(byte_data, dtype=np.uint8)

# Uncompress counts. Decompressed data is a list of 1260
# where 1260 = 180 x 7 CEMs
# Take the "raw_counts" indices/counts mapping from
# decompression_table and then reshape the return
uncompress_data = np.take(decompression_table, raw_counts).reshape(180, 7) # type: ignore[attr-defined]
# Save raw counts data as well
raw_counts = raw_counts.reshape(180, 7)

# Save data with its metadata field to attrs and DataArray of xarray.
# Save data as np.int64 to be complaint with ISTP' FILLVAL
science_array.append(uncompress_data.astype(np.int64))
raw_science_array.append(raw_counts.astype(np.int64))
metadata_arrays = add_metadata_to_array(data_packet, metadata_arrays)
# Loop through each packet individually with a list comprehension and
# perform the following steps:
# 1. Turn the binary string of 0s and 1s to an int
# 2. Convert the int into a bytes object of length 1260 (10080 / 8)
# Eg. "0000000011110011" --> b'\x00\xf3'
# 1260 = 15 seconds x 12 energy steps x 7 CEMs
# 3. Read that bytes data to a numpy array of uint8 through the buffer protocol
# 4. Reshape the data to 180 x 7
raw_science_array = np.array(
[
np.frombuffer(
int(binary_string, 2).to_bytes(1260, byteorder="big"), dtype=np.uint8
).reshape(180, 7)
for binary_string in l0_dataset["science_data"].values
]
)

# Decompress the raw science data using numpy broadcasting logic
# science_array will be the same shape as raw_science_array (npackets, 180, 7)
science_array = decompression_table[raw_science_array]

# Load CDF attrs
cdf_attrs = ImapCdfAttributes()
cdf_attrs.add_instrument_global_attrs("swe")
cdf_attrs.add_instrument_variable_attrs("swe", "l1a")
cdf_attrs.add_global_attribute("Data_version", data_version)

epoch_converted_time = met_to_j2000ns(metadata_arrays["SHCOARSE"])
epoch_time = xr.DataArray(
epoch_converted_time,
l0_dataset["epoch"],
name="epoch",
dims=["epoch"],
attrs=cdf_attrs.get_variable_attributes("epoch"),
Expand Down Expand Up @@ -202,7 +187,7 @@ def swe_science(decom_data: list, data_version: str) -> xr.Dataset:
# Add APID to global attrs for following processing steps
l1a_global_attrs = cdf_attrs.get_global_attributes("imap_swe_l1a_sci")
# Formatting to string to be complaint with ISTP
l1a_global_attrs["packet_apid"] = f"{decom_data[0].header['PKT_APID'].raw_value}"
# l1a_global_attrs["packet_apid"] = SWEAPID.SWE_SCIENCE.value
dataset = xr.Dataset(
coords={
"epoch": epoch_time,
Expand All @@ -215,16 +200,23 @@ def swe_science(decom_data: list, data_version: str) -> xr.Dataset:
)
dataset["science_data"] = science_xarray
dataset["raw_science_data"] = raw_science_xarray
# TODO: Remove the header in packet_file_to_datasets
# The science_data variable is also in the l1 dataset with different values
l0_dataset = l0_dataset.drop_vars(
[
"science_data",
"version",
"type",
"sec_hdr_flg",
"pkt_apid",
"seq_flgs",
"src_seq_ctr",
"pkt_len",
]
)
for var_name, arr in l0_dataset.variables.items():
arr.attrs = cdf_attrs.get_variable_attributes(var_name)
dataset = dataset.merge(l0_dataset)

# create xarray dataset for each metadata field
for key, value in metadata_arrays.items():
# Lowercase the key to be complaint with ISTP's metadata field
metadata_field = key.lower()
dataset[metadata_field] = xr.DataArray(
value,
dims=["epoch"],
attrs=cdf_attrs.get_variable_attributes(metadata_field),
)

logger.info("SWE L1A science data process completed")
logger.info("SWE L1A science data processing completed.")
return dataset
Loading

0 comments on commit c70e024

Please sign in to comment.