From 3e716926127ec39c3c3603393a271e9195c3ae68 Mon Sep 17 00:00:00 2001 From: Laura Sandoval <46567335+laspsandoval@users.noreply.github.com> Date: Tue, 27 Aug 2024 11:32:55 -0600 Subject: [PATCH] Add Pulse Height Calculations (#750) * add pulse height calculations --- .../ultra/unit/test_ultra_l1b_extended.py | 26 ++++ imap_processing/ultra/l1b/lookup_utils.py | 4 +- .../ultra/l1b/ultra_l1b_extended.py | 122 ++++++++++++++++++ 3 files changed, 151 insertions(+), 1 deletion(-) diff --git a/imap_processing/tests/ultra/unit/test_ultra_l1b_extended.py b/imap_processing/tests/ultra/unit/test_ultra_l1b_extended.py index b48c62afe..84b027666 100644 --- a/imap_processing/tests/ultra/unit/test_ultra_l1b_extended.py +++ b/imap_processing/tests/ultra/unit/test_ultra_l1b_extended.py @@ -1,12 +1,15 @@ """Tests Extended Raw Events for ULTRA L1b.""" +import numpy as np import pandas as pd import pytest from imap_processing.ultra.l1b.ultra_l1b_extended import ( + StopType, get_front_x_position, get_front_y_position, get_path_length, + get_ph_tof_and_back_positions, ) @@ -59,3 +62,26 @@ def test_get_path_length(de_dataset, yf_fixture): test_yb = df_filt["Yb"].astype("float").values r = get_path_length((test_xf, test_yf), (test_xb, test_yb), d) assert r == pytest.approx(df_filt["r"].astype("float"), abs=1e-5) + + +def test_get_ph_tof_and_back_positions( + de_dataset, + events_fsw_comparison_theta_0, +): + """Tests get_ph_tof_and_back_positions function.""" + + df = pd.read_csv(events_fsw_comparison_theta_0) + df_filt = df[df["StartType"] != -1] + + _, _, ph_xb, ph_yb = get_ph_tof_and_back_positions( + de_dataset, df_filt.Xf.astype("float").values, "ultra45" + ) + + ph_indices = np.nonzero( + np.isin(de_dataset["STOP_TYPE"], [StopType.Top.value, StopType.Bottom.value]) + )[0] + + selected_rows = df_filt.iloc[ph_indices] + + np.testing.assert_array_equal(ph_xb, selected_rows["Xb"].astype("float")) + np.testing.assert_array_equal(ph_yb, selected_rows["Yb"].astype("float")) diff --git a/imap_processing/ultra/l1b/lookup_utils.py b/imap_processing/ultra/l1b/lookup_utils.py index 08673c476..46585e959 100644 --- a/imap_processing/ultra/l1b/lookup_utils.py +++ b/imap_processing/ultra/l1b/lookup_utils.py @@ -78,7 +78,9 @@ def get_norm(dn: np.ndarray, key: str, file_label: str) -> npt.NDArray: else: tdc_norm_df = _TDC_NORM_DF_ULTRA90 - return tdc_norm_df[key].values[dn] + dn_norm = tdc_norm_df[key].iloc[dn].values + + return dn_norm def get_back_position(back_index: np.ndarray, key: str, file_label: str) -> npt.NDArray: diff --git a/imap_processing/ultra/l1b/ultra_l1b_extended.py b/imap_processing/ultra/l1b/ultra_l1b_extended.py index c977981fb..47c25b653 100644 --- a/imap_processing/ultra/l1b/ultra_l1b_extended.py +++ b/imap_processing/ultra/l1b/ultra_l1b_extended.py @@ -1,10 +1,15 @@ """Calculates Extended Raw Events for ULTRA L1b.""" +from enum import Enum + import numpy as np +import xarray from numpy import ndarray from imap_processing.ultra.l1b.lookup_utils import ( + get_back_position, get_image_params, + get_norm, get_y_adjust, ) @@ -18,6 +23,13 @@ # TODO: make lookup tables into config files. +class StopType(Enum): + """Stop Type: 1=Top, 2=Bottom.""" + + Top = 1 + Bottom = 2 + + def get_front_x_position(start_type: ndarray, start_position_tdc: ndarray) -> ndarray: """ Calculate the front xf position. @@ -111,6 +123,116 @@ def get_front_y_position(start_type: ndarray, yb: ndarray) -> tuple[ndarray, nda return np.array(d), np.array(yf) +def get_ph_tof_and_back_positions( + de_dataset: xarray.Dataset, xf: np.ndarray, sensor: str +) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: + """ + Calculate back xb, yb position and tof. + + An incoming particle may trigger pulses from one of the stop anodes. + If so, four pulses are produced, one each from the north, south, + east, and west sides. + + The Time Of Flight (tof) and the position of the particle at the + back of the sensor are measured using the timing of the pulses. + Further description is available on pages 32-33 of + IMAP-Ultra Flight Software Specification document + (7523-9009_Rev_-.pdf). + + Parameters + ---------- + de_dataset : xarray.Dataset + Data in xarray format. + xf : np.array + X front position in (hundredths of a millimeter). + Has same length as de_dataset. + sensor : str + Sensor name. + + Returns + ------- + tof : np.array + Time of flight (tenths of a nanosecond). + t2 : np.array + Particle time of flight from start to stop (tenths of a nanosecond). + xb : np.array + Back positions in x direction (hundredths of a millimeter). + yb : np.array + Back positions in y direction (hundredths of a millimeter). + """ + indices = np.nonzero( + np.isin(de_dataset["STOP_TYPE"], [StopType.Top.value, StopType.Bottom.value]) + )[0] + de_filtered = de_dataset.isel(epoch=indices) + + xf_ph = xf[indices] + + # There are mismatches between the stop TDCs, i.e., SpN, SpS, SpE, and SpW. + # This normalizes the TDCs + sp_n_norm = get_norm(de_filtered["STOP_NORTH_TDC"].data, "SpN", sensor) + sp_s_norm = get_norm(de_filtered["STOP_SOUTH_TDC"].data, "SpS", sensor) + sp_e_norm = get_norm(de_filtered["STOP_EAST_TDC"].data, "SpE", sensor) + sp_w_norm = get_norm(de_filtered["STOP_WEST_TDC"].data, "SpW", sensor) + + # Convert normalized TDC values into units of hundredths of a + # millimeter using lookup tables. + xb_index = sp_s_norm - sp_n_norm + 2047 + yb_index = sp_e_norm - sp_w_norm + 2047 + + # Convert xf to a tof offset + tofx = sp_n_norm + sp_s_norm + tofy = sp_e_norm + sp_w_norm + + # tof is the average of the two tofs measured in the X and Y directions, + # tofx and tofy + # Units in tenths of a nanosecond + t1 = tofx + tofy # /2 incorporated into scale + + xb = np.zeros(len(indices)) + yb = np.zeros(len(indices)) + + # particle_tof (t2) used later to compute etof + t2 = np.zeros(len(indices)) + tof = np.zeros(len(indices)) + + # Stop Type: 1=Top, 2=Bottom + # Convert converts normalized TDC values into units of + # hundredths of a millimeter using lookup tables. + stop_type_top = de_filtered["STOP_TYPE"].data == StopType.Top.value + xb[stop_type_top] = get_back_position(xb_index[stop_type_top], "XBkTp", sensor) + yb[stop_type_top] = get_back_position(yb_index[stop_type_top], "YBkTp", sensor) + + # Correction for the propagation delay of the start anode and other effects. + t2[stop_type_top] = get_image_params("TOFSC") * t1[ + stop_type_top + ] + get_image_params("TOFTPOFF") + tof[stop_type_top] = t2[stop_type_top] + xf_ph[stop_type_top] * get_image_params( + "XFTTOF" + ) + + stop_type_bottom = de_filtered["STOP_TYPE"].data == StopType.Bottom.value + xb[stop_type_bottom] = get_back_position( + xb_index[stop_type_bottom], "XBkBt", sensor + ) + yb[stop_type_bottom] = get_back_position( + yb_index[stop_type_bottom], "YBkBt", sensor + ) + + # Correction for the propagation delay of the start anode and other effects. + t2[stop_type_bottom] = get_image_params("TOFSC") * t1[ + stop_type_bottom + ] + get_image_params("TOFBTOFF") # 10*ns + + tof[stop_type_bottom] = t2[stop_type_bottom] + xf_ph[ + stop_type_bottom + ] * get_image_params("XFTTOF") + + # Multiply by 100 to get tenths of a nanosecond. + tof = tof * 100 + + return tof, t2, xb, yb + + def get_path_length(front_position: tuple, back_position: tuple, d: float) -> float: """ Calculate the path length.