From 6aab07c9d15c21795beb3d94b727b15226c10b6a Mon Sep 17 00:00:00 2001 From: Laura Sandoval Date: Thu, 29 Aug 2024 16:15:01 -0600 Subject: [PATCH] including new function for decom --- imap_processing/ialirt/l0/process_hit.py | 18 +-- ...ecom_ialirt_hit.py => test_process_hit.py} | 112 +++--------------- 2 files changed, 27 insertions(+), 103 deletions(-) rename imap_processing/tests/ialirt/unit/{test_decom_ialirt_hit.py => test_process_hit.py} (51%) diff --git a/imap_processing/ialirt/l0/process_hit.py b/imap_processing/ialirt/l0/process_hit.py index 4a4e4ef52..6d540e2aa 100644 --- a/imap_processing/ialirt/l0/process_hit.py +++ b/imap_processing/ialirt/l0/process_hit.py @@ -80,22 +80,22 @@ def find_groups(data: xr.Dataset) -> xr.Dataset: """ subcom_range = (0, 59) - data = data.sortby("HIT_SC_TICK", ascending=True) + data = data.sortby("hit_sc_tick", ascending=True) # Filter out data before the first subcom=0 and after the last subcom=59 - start_sc_ticks = data["HIT_SC_TICK"][(data["HIT_SUBCOM"] == subcom_range[0]).values] + start_sc_ticks = data["hit_sc_tick"][(data["hit_subcom"] == subcom_range[0]).values] start_sc_tick = start_sc_ticks.data.min() - last_sc_ticks = data["HIT_SC_TICK"][([data["HIT_SUBCOM"] == subcom_range[-1]][-1])] + last_sc_ticks = data["hit_sc_tick"][([data["hit_subcom"] == subcom_range[-1]][-1])] last_sc_tick = last_sc_ticks.data.max() grouped_data = data.where( - (data["HIT_SC_TICK"] >= start_sc_tick) & (data["HIT_SC_TICK"] <= last_sc_tick), + (data["hit_sc_tick"] >= start_sc_tick) & (data["hit_sc_tick"] <= last_sc_tick), drop=True, ) # Create group labels group_labels = np.searchsorted( - start_sc_ticks, grouped_data["HIT_SC_TICK"], side="right" + start_sc_ticks, grouped_data["hit_sc_tick"], side="right" ) grouped_data["group"] = ("group", group_labels) @@ -161,16 +161,16 @@ def process_hit(xarray_data: xr.Dataset) -> list[dict]: grouped_data = find_groups(xarray_data) for group in np.unique(grouped_data["group"]): - fast_rate_1 = grouped_data["HIT_FAST_RATE_1"][ + fast_rate_1 = grouped_data["hit_fast_rate_1"][ (grouped_data["group"] == group).values ] - fast_rate_2 = grouped_data["HIT_FAST_RATE_2"][ + fast_rate_2 = grouped_data["hit_fast_rate_2"][ (grouped_data["group"] == group).values ] - slow_rate = grouped_data["HIT_SLOW_RATE"][ + slow_rate = grouped_data["hit_slow_rate"][ (grouped_data["group"] == group).values ] - met = int(grouped_data["HIT_MET"][(grouped_data["group"] == group).values][0]) + met = int(grouped_data["hit_met"][(grouped_data["group"] == group).values][0]) # Verify that each group has 60 datapoints if len(slow_rate.data) != 60: diff --git a/imap_processing/tests/ialirt/unit/test_decom_ialirt_hit.py b/imap_processing/tests/ialirt/unit/test_process_hit.py similarity index 51% rename from imap_processing/tests/ialirt/unit/test_decom_ialirt_hit.py rename to imap_processing/tests/ialirt/unit/test_process_hit.py index 0124227f6..77f7a58c0 100644 --- a/imap_processing/tests/ialirt/unit/test_decom_ialirt_hit.py +++ b/imap_processing/tests/ialirt/unit/test_process_hit.py @@ -1,9 +1,6 @@ -import numpy as np -import pandas as pd import pytest from imap_processing import imap_module_directory -from imap_processing.decom import decom_packets from imap_processing.ialirt.l0.decom_ialirt import generate_xarray from imap_processing.ialirt.l0.process_hit import ( HITPrefixes, @@ -32,44 +29,17 @@ def binary_packet_path(): ) -@pytest.fixture(scope="session") -def hit_test_data(): - """Returns the xtce auxiliary directory.""" - - data_path = ( - imap_module_directory - / "tests" - / "ialirt" - / "test_data" - / "l0" - / "hit_ialirt_sample.csv" - ) - data = pd.read_csv(data_path, na_values=[" ", ""]) - - return data - - -@pytest.fixture() -def decom_packets_data(binary_packet_path, xtce_hit_path): - """Read packet data from file using decom_packets""" - data_packet_list = decom_packets(binary_packet_path, xtce_hit_path) - return data_packet_list - - @pytest.fixture() def xarray_data(binary_packet_path, xtce_hit_path): """Create xarray data""" + apid = 1253 + xarray_data = generate_xarray( binary_packet_path, xtce_hit_path, time_keys={"HIT": "HIT_SC_TICK"} - ) + )[apid] return xarray_data -def test_length(decom_packets_data, hit_test_data): - """Test if total packets in data file is correct""" - assert len(decom_packets_data) == len(hit_test_data) - - def generate_prefixes(prefixes): return [f"{prefix}_{i:02d}" for i in range(15) for prefix in prefixes] @@ -131,23 +101,23 @@ def test_prefixes(): def test_find_groups(xarray_data): """Tests find_groups""" - filtered_data = find_groups(xarray_data["HIT"]) + filtered_data = find_groups(xarray_data) - assert filtered_data["HIT_SUBCOM"].values[0] == 0 - assert filtered_data["HIT_SUBCOM"].values[-1] == 59 - assert len(filtered_data["HIT_SUBCOM"]) / 60 == 15 - assert len(filtered_data["HIT_SC_TICK"][filtered_data["HIT_SUBCOM"] == 0]) == 15 - assert len(filtered_data["HIT_SC_TICK"][filtered_data["HIT_SUBCOM"] == 59]) == 15 + assert filtered_data["hit_subcom"].values[0] == 0 + assert filtered_data["hit_subcom"].values[-1] == 59 + assert len(filtered_data["hit_subcom"]) / 60 == 15 + assert len(filtered_data["hit_sc_tick"][filtered_data["hit_subcom"] == 0]) == 15 + assert len(filtered_data["hit_sc_tick"][filtered_data["hit_subcom"] == 59]) == 15 def test_create_l1(xarray_data): """Tests create_l1""" - filtered_data = find_groups(xarray_data["HIT"]) + filtered_data = find_groups(xarray_data) - fast_rate_1 = filtered_data["HIT_FAST_RATE_1"][(filtered_data["group"] == 4).values] - fast_rate_2 = filtered_data["HIT_FAST_RATE_2"][(filtered_data["group"] == 4).values] - slow_rate = filtered_data["HIT_SLOW_RATE"][(filtered_data["group"] == 4).values] + fast_rate_1 = filtered_data["hit_fast_rate_1"][(filtered_data["group"] == 4).values] + fast_rate_2 = filtered_data["hit_fast_rate_2"][(filtered_data["group"] == 4).values] + slow_rate = filtered_data["hit_slow_rate"][(filtered_data["group"] == 4).values] l1 = create_l1(fast_rate_1, fast_rate_2, slow_rate) @@ -161,13 +131,13 @@ def test_process_hit(xarray_data, caplog): """Tests process_hit.""" # Tests that it functions normally - hit_product = process_hit(xarray_data["HIT"]) + hit_product = process_hit(xarray_data) assert len(hit_product) == 15 # Make a subset of data that has values to check the calculations of process hit. - indices = (xarray_data["HIT"]["HIT_MET"] != 0).values.nonzero()[0] - xarray_data["HIT"]["HIT_SLOW_RATE"].values[indices[0] : indices[0] + 60] = 2 - subset = xarray_data["HIT"].isel(HIT_SC_TICK=slice(indices[0], indices[0] + 60)) + indices = (xarray_data["hit_met"] != 0).values.nonzero()[0] + xarray_data["hit_slow_rate"].values[indices[0] : indices[0] + 60] = 2 + subset = xarray_data.isel(epoch=slice(indices[0], indices[0] + 60)) hit_product = process_hit(subset) @@ -179,7 +149,7 @@ def test_process_hit(xarray_data, caplog): assert hit_product[0]["hit_high_energy_He_omni"] == 2 # Create a scrambled set of subcom values. - xarray_data["HIT"]["HIT_SUBCOM"].values[indices[0] : indices[0] + 60] = [ + xarray_data["hit_subcom"].values[indices[0] : indices[0] + 60] = [ i for i in range(29) for _ in range(2) ] + [59, 59] @@ -189,49 +159,3 @@ def test_process_hit(xarray_data, caplog): assert any( "Incorrect number of packets" in record.message for record in caplog.records ) - - -def test_decom_packets(xarray_data, hit_test_data): - """This function checks that all instrument parameters are accounted for.""" - - fast_rate_1 = xarray_data["HIT"]["HIT_FAST_RATE_1"] - fast_rate_2 = xarray_data["HIT"]["HIT_FAST_RATE_2"] - slow_rate = xarray_data["HIT"]["HIT_SLOW_RATE"] - - # The sequence begins where "HIT_MET" != 0 - start_index = hit_test_data.index[hit_test_data["MET"] != 0][3] - - # Test Fast Rate 1 - ccsds_fast_rate_1 = fast_rate_1[start_index : start_index + 60] - test_fast_rate_1 = hit_test_data.loc[ - start_index : start_index + 59, - hit_test_data.columns.str.startswith(tuple(HITPrefixes.FAST_RATE_1.value)), - ] - flat_test_fast_rate_1 = test_fast_rate_1.to_numpy().flatten() - np.testing.assert_array_equal( - ccsds_fast_rate_1.values, - flat_test_fast_rate_1[~np.isnan(flat_test_fast_rate_1)], - ) - - # Test Fast Rate 2 - ccsds_fast_rate_2 = fast_rate_2[start_index : start_index + 60] - test_fast_rate_2 = hit_test_data.loc[ - start_index : start_index + 59, - hit_test_data.columns.str.startswith(tuple(HITPrefixes.FAST_RATE_2.value)), - ] - flat_test_fast_rate_2 = test_fast_rate_2.to_numpy().flatten() - np.testing.assert_array_equal( - ccsds_fast_rate_2.values, - flat_test_fast_rate_2[~np.isnan(flat_test_fast_rate_2)], - ) - - # Test Slow Rate - ccsds_slow_rate = slow_rate[start_index : start_index + 60] - test_slow_rate = hit_test_data.loc[ - start_index : start_index + 59, - hit_test_data.columns.isin(HITPrefixes.SLOW_RATE.value), - ] - flat_test_slow_rate = test_slow_rate.to_numpy().flatten() - np.testing.assert_array_equal( - ccsds_slow_rate.values, flat_test_slow_rate[~np.isnan(flat_test_slow_rate)] - )