Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Pulse Height Calculations #750

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions imap_processing/tests/ultra/unit/test_ultra_l1b_extended.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
"""Tests Extended Raw Events for ULTRA L1b."""

import numpy as np
import pandas as pd
import pytest

from imap_processing.ultra.l1b.ultra_l1b_extended import (
StopType,
get_front_x_position,
get_front_y_position,
get_path_length,
get_ph_tof_and_back_positions,
)


Expand Down Expand Up @@ -59,3 +62,26 @@ def test_get_path_length(de_dataset, yf_fixture):
test_yb = df_filt["Yb"].astype("float").values
r = get_path_length((test_xf, test_yf), (test_xb, test_yb), d)
assert r == pytest.approx(df_filt["r"].astype("float"), abs=1e-5)


def test_get_ph_tof_and_back_positions(
de_dataset,
events_fsw_comparison_theta_0,
):
"""Tests get_ph_tof_and_back_positions function."""

df = pd.read_csv(events_fsw_comparison_theta_0)
df_filt = df[df["StartType"] != -1]

_, _, ph_xb, ph_yb = get_ph_tof_and_back_positions(
de_dataset, df_filt.Xf.astype("float").values, "ultra45"
)

ph_indices = np.nonzero(
np.isin(de_dataset["STOP_TYPE"], [StopType.Top.value, StopType.Bottom.value])
)[0]

selected_rows = df_filt.iloc[ph_indices]

np.testing.assert_array_equal(ph_xb, selected_rows["Xb"].astype("float"))
np.testing.assert_array_equal(ph_yb, selected_rows["Yb"].astype("float"))
4 changes: 3 additions & 1 deletion imap_processing/ultra/l1b/lookup_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ def get_norm(dn: np.ndarray, key: str, file_label: str) -> npt.NDArray:
else:
tdc_norm_df = _TDC_NORM_DF_ULTRA90

return tdc_norm_df[key].values[dn]
dn_norm = tdc_norm_df[key].iloc[dn].values

return dn_norm


def get_back_position(back_index: np.ndarray, key: str, file_label: str) -> npt.NDArray:
Expand Down
122 changes: 122 additions & 0 deletions imap_processing/ultra/l1b/ultra_l1b_extended.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
"""Calculates Extended Raw Events for ULTRA L1b."""

from enum import Enum

import numpy as np
import xarray
from numpy import ndarray

from imap_processing.ultra.l1b.lookup_utils import (
get_back_position,
get_image_params,
get_norm,
get_y_adjust,
)

Expand All @@ -18,6 +23,13 @@
# TODO: make lookup tables into config files.


class StopType(Enum):
"""Stop Type: 1=Top, 2=Bottom."""

Top = 1
Bottom = 2


def get_front_x_position(start_type: ndarray, start_position_tdc: ndarray) -> ndarray:
"""
Calculate the front xf position.
Expand Down Expand Up @@ -111,6 +123,116 @@ def get_front_y_position(start_type: ndarray, yb: ndarray) -> tuple[ndarray, nda
return np.array(d), np.array(yf)


def get_ph_tof_and_back_positions(
de_dataset: xarray.Dataset, xf: np.ndarray, sensor: str
) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
laspsandoval marked this conversation as resolved.
Show resolved Hide resolved
"""
Calculate back xb, yb position and tof.

An incoming particle may trigger pulses from one of the stop anodes.
If so, four pulses are produced, one each from the north, south,
east, and west sides.

The Time Of Flight (tof) and the position of the particle at the
back of the sensor are measured using the timing of the pulses.
Further description is available on pages 32-33 of
IMAP-Ultra Flight Software Specification document
(7523-9009_Rev_-.pdf).

Parameters
----------
de_dataset : xarray.Dataset
Data in xarray format.
xf : np.array
laspsandoval marked this conversation as resolved.
Show resolved Hide resolved
X front position in (hundredths of a millimeter).
Has same length as de_dataset.
sensor : str
Sensor name.

Returns
-------
tof : np.array
Time of flight (tenths of a nanosecond).
t2 : np.array
Particle time of flight from start to stop (tenths of a nanosecond).
xb : np.array
Back positions in x direction (hundredths of a millimeter).
yb : np.array
Back positions in y direction (hundredths of a millimeter).
"""
indices = np.nonzero(
np.isin(de_dataset["STOP_TYPE"], [StopType.Top.value, StopType.Bottom.value])
)[0]
de_filtered = de_dataset.isel(epoch=indices)

xf_ph = xf[indices]

# There are mismatches between the stop TDCs, i.e., SpN, SpS, SpE, and SpW.
# This normalizes the TDCs
sp_n_norm = get_norm(de_filtered["STOP_NORTH_TDC"].data, "SpN", sensor)
sp_s_norm = get_norm(de_filtered["STOP_SOUTH_TDC"].data, "SpS", sensor)
sp_e_norm = get_norm(de_filtered["STOP_EAST_TDC"].data, "SpE", sensor)
sp_w_norm = get_norm(de_filtered["STOP_WEST_TDC"].data, "SpW", sensor)

# Convert normalized TDC values into units of hundredths of a
# millimeter using lookup tables.
xb_index = sp_s_norm - sp_n_norm + 2047
yb_index = sp_e_norm - sp_w_norm + 2047

# Convert xf to a tof offset
tofx = sp_n_norm + sp_s_norm
tofy = sp_e_norm + sp_w_norm

# tof is the average of the two tofs measured in the X and Y directions,
# tofx and tofy
# Units in tenths of a nanosecond
t1 = tofx + tofy # /2 incorporated into scale

xb = np.zeros(len(indices))
yb = np.zeros(len(indices))

# particle_tof (t2) used later to compute etof
t2 = np.zeros(len(indices))
tof = np.zeros(len(indices))

# Stop Type: 1=Top, 2=Bottom
# Convert converts normalized TDC values into units of
# hundredths of a millimeter using lookup tables.
stop_type_top = de_filtered["STOP_TYPE"].data == StopType.Top.value
xb[stop_type_top] = get_back_position(xb_index[stop_type_top], "XBkTp", sensor)
yb[stop_type_top] = get_back_position(yb_index[stop_type_top], "YBkTp", sensor)

# Correction for the propagation delay of the start anode and other effects.
t2[stop_type_top] = get_image_params("TOFSC") * t1[
stop_type_top
] + get_image_params("TOFTPOFF")
tof[stop_type_top] = t2[stop_type_top] + xf_ph[stop_type_top] * get_image_params(
"XFTTOF"
)

stop_type_bottom = de_filtered["STOP_TYPE"].data == StopType.Bottom.value
xb[stop_type_bottom] = get_back_position(
xb_index[stop_type_bottom], "XBkBt", sensor
)
yb[stop_type_bottom] = get_back_position(
yb_index[stop_type_bottom], "YBkBt", sensor
)

# Correction for the propagation delay of the start anode and other effects.
t2[stop_type_bottom] = get_image_params("TOFSC") * t1[
stop_type_bottom
] + get_image_params("TOFBTOFF") # 10*ns

tof[stop_type_bottom] = t2[stop_type_bottom] + xf_ph[
stop_type_bottom
] * get_image_params("XFTTOF")

# Multiply by 100 to get tenths of a nanosecond.
tof = tof * 100

return tof, t2, xb, yb


def get_path_length(front_position: tuple, back_position: tuple, d: float) -> float:
"""
Calculate the path length.
Expand Down
Loading