diff --git a/imap_processing/tests/ultra/unit/test_ultra_l1b_extended.py b/imap_processing/tests/ultra/unit/test_ultra_l1b_extended.py index 84b027666..2f9ed9826 100644 --- a/imap_processing/tests/ultra/unit/test_ultra_l1b_extended.py +++ b/imap_processing/tests/ultra/unit/test_ultra_l1b_extended.py @@ -5,11 +5,13 @@ import pytest from imap_processing.ultra.l1b.ultra_l1b_extended import ( + StartType, StopType, get_front_x_position, get_front_y_position, get_path_length, get_ph_tof_and_back_positions, + get_ssd_back_position_and_tof_offset, ) @@ -85,3 +87,43 @@ def test_get_ph_tof_and_back_positions( np.testing.assert_array_equal(ph_xb, selected_rows["Xb"].astype("float")) np.testing.assert_array_equal(ph_yb, selected_rows["Yb"].astype("float")) + + +def test_get_ssd_back_position_and_tof_offset( + de_dataset, + events_fsw_comparison_theta_0, +): + """Tests get_ssd_back_position function.""" + yb, tof_offset, ssd_number = get_ssd_back_position_and_tof_offset(de_dataset) + + df = pd.read_csv(events_fsw_comparison_theta_0) + df_filt = df[(df["StartType"] != -1) & (df["StopType"] >= 8)] + + np.testing.assert_array_equal(yb, df_filt["Yb"].astype("float")) + + tof_offset_lt = tof_offset[df_filt["StartType"] == StartType.Left.value] + tof_offset_rt = tof_offset[df_filt["StartType"] == StartType.Right.value] + + ssd_number_lt = ssd_number[df_filt["StartType"] == StartType.Left.value] + ssd_number_rt = ssd_number[df_filt["StartType"] == StartType.Right.value] + + np.testing.assert_array_equal( + tof_offset_lt[ssd_number_lt == 3], + np.full(len(tof_offset_lt[ssd_number_lt == 3]), -4.2), + ) + np.testing.assert_array_equal( + tof_offset_rt[ssd_number_rt == 7], + np.full(len(tof_offset_rt[ssd_number_rt == 7]), -6), + ) + np.testing.assert_array_equal( + tof_offset_rt[ssd_number_rt == 4], + np.full(len(tof_offset_rt[ssd_number_rt == 4]), -4), + ) + + assert np.all(ssd_number_lt >= 0), "Values in ssd_number_lt out of range." + + assert np.all(ssd_number_lt <= 7), "Values in ssd_number_lt out of range." + + assert np.all(ssd_number_rt >= 0), "Values in ssd_number_rt out of range." + + assert np.all(ssd_number_rt <= 7), "Values in ssd_number_rt out of range." diff --git a/imap_processing/ultra/l1b/ultra_l1b_extended.py b/imap_processing/ultra/l1b/ultra_l1b_extended.py index 47c25b653..a70e5e032 100644 --- a/imap_processing/ultra/l1b/ultra_l1b_extended.py +++ b/imap_processing/ultra/l1b/ultra_l1b_extended.py @@ -1,6 +1,7 @@ """Calculates Extended Raw Events for ULTRA L1b.""" from enum import Enum +from typing import ClassVar import numpy as np import xarray @@ -23,11 +24,19 @@ # TODO: make lookup tables into config files. +class StartType(Enum): + """Start Type: 1=Left, 2=Right.""" + + Left = 1 + Right = 2 + + class StopType(Enum): - """Stop Type: 1=Top, 2=Bottom.""" + """Stop Type: 1=Top, 2=Bottom, SSD: 8-15.""" Top = 1 Bottom = 2 + SSD: ClassVar[list[int]] = [8, 9, 10, 11, 12, 13, 14, 15] def get_front_x_position(start_type: ndarray, start_position_tdc: ndarray) -> ndarray: @@ -258,3 +267,51 @@ def get_path_length(front_position: tuple, back_position: tuple, d: float) -> fl ) return r + + +def get_ssd_back_position_and_tof_offset( + de_dataset: xarray.Dataset, +) -> tuple[np.ndarray, np.ndarray, np.ndarray]: + """ + Lookup the Y SSD positions (yb), TOF Offset, and SSD number. + + Parameters + ---------- + de_dataset : xarray.Dataset + The input dataset containing STOP_TYPE and SSD_FLAG data. + + Returns + ------- + yb : np.ndarray + Y SSD positions in hundredths of a millimeter. + tof_offset : np.ndarray + TOF offset. + ssd_number : np.ndarray + SSD number. + + Notes + ----- + The X back position (xb) is assumed to be 0 for SSD. + """ + indices = np.nonzero(np.isin(de_dataset["STOP_TYPE"], StopType.SSD.value))[0] + de_filtered = de_dataset.isel(epoch=indices) + + yb = np.zeros(len(indices), dtype=np.float64) + ssd_number = np.zeros(len(indices), dtype=int) + tof_offset = np.zeros(len(indices), dtype=np.float64) + + for i in range(8): + ssd_flag_mask = de_filtered[f"SSD_FLAG_{i}"].data == 1 + + # Multiply ybs times 100 to convert to hundredths of a millimeter. + yb[ssd_flag_mask] = get_image_params(f"YBKSSD{i}") * 100 + ssd_number[ssd_flag_mask] = i + + tof_offset[ + (de_filtered["START_TYPE"] == StartType.Left.value) & ssd_flag_mask + ] = get_image_params(f"TOFSSDLTOFF{i}") + tof_offset[ + (de_filtered["START_TYPE"] == StartType.Right.value) & ssd_flag_mask + ] = get_image_params(f"TOFSSDRTOFF{i}") + + return yb, tof_offset, ssd_number