diff --git a/imap_processing/spice/kernels.py b/imap_processing/spice/kernels.py index 0a3f18493..87d73945b 100644 --- a/imap_processing/spice/kernels.py +++ b/imap_processing/spice/kernels.py @@ -173,7 +173,7 @@ def wrapper_ensure_spice(*args: Any, **kwargs: Any) -> Any: @contextmanager -def spice_ck_file(pointing_frame_path: str) -> Generator[int, None, None]: +def open_spice_ck_file(pointing_frame_path: Path) -> Generator[int, None, None]: """ Context manager for handling SPICE CK files. @@ -187,7 +187,13 @@ def spice_ck_file(pointing_frame_path: str) -> Generator[int, None, None]: handle : int Handle to the opened CK file. """ - handle = spice.ckopn(pointing_frame_path, "CK", 0) + # TODO: We will need to figure out if ck kernel changes + # and how that will affect appending to the pointing + # frame kernel. + if pointing_frame_path.exists(): + handle = spice.dafopw(str(pointing_frame_path)) + else: + handle = spice.ckopn(str(pointing_frame_path), "CK", 0) try: yield handle finally: @@ -195,125 +201,158 @@ def spice_ck_file(pointing_frame_path: str) -> Generator[int, None, None]: @ensure_spice -def create_pointing_frame(pointing_frame_path: Optional[Path] = None) -> Path: +def create_pointing_frame(pointing_frame_path: Path, ck_path: Path) -> None: """ Create the pointing frame. Parameters ---------- pointing_frame_path : Path - Directory of where pointing frame will be saved. + Location of pointing frame kernel. + ck_path : Path + Location of the CK kernel. - Returns - ------- - pointing_frame_path : Path - Path to pointing frame. + Notes + ----- + Kernels required to be furnished: + "imap_science_0001.tf", + "imap_sclk_0000.tsc", + "imap_sim_ck_2hr_2secsampling_with_nutation.bc" or + "sim_1yr_imap_attitude.bc", + "imap_wkcp.tf", + "naif0012.tls" + + Assumptions: + - The MOC has removed timeframe in which nutation/procession are present. + TODO: We may come back and have a check for this. + - We will continue to append to the pointing frame kernel. + TODO: Figure out how we want to handle the file size becoming too large. + - For now we can only furnish a single ck kernel. + TODO: This will not be the case once we add the ability to query the .csv. References ---------- https://numpydoc.readthedocs.io/en/latest/format.html#references """ - ck_kernel, _, _, _ = spice.kdata(0, "ck") - - # Get timerange for the pointing frame kernel. - et_start, et_end, et_times = _get_et_times(ck_kernel) - # Create a rotation matrix - rotation_matrix = _create_rotation_matrix(et_times) - - # Convert the rotation matrix to a quaternion. - # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.m2q - q_avg = spice.m2q(rotation_matrix) - - # TODO: come up with naming convention. - if pointing_frame_path is None: - pointing_frame_path = Path(ck_kernel).parent / "imap_dps.bc" - - # Open a new CK file, returning the handle of the opened file. - # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.ckopn - with spice_ck_file(str(pointing_frame_path)) as handle: - # Get the SCLK ID. - # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.gipool - id_imap_sclk = spice.gipool("CK_-43000_SCLK", 0, 1) - # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.sce2c - # Convert start and end times to SCLK. - sclk_begtim = spice.sce2c(int(id_imap_sclk), et_start) - sclk_endtim = spice.sce2c(int(id_imap_sclk), et_end) - - # Get the pointing frame ID. - # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.gipool - id_imap_dps = spice.gipool("FRAME_IMAP_DPS", 0, 1) - # TODO: Figure out how to write new pointings to same CK kernel. - # Create the pointing frame kernel. - # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.ckw02 - spice.ckw02( - # Handle of an open CK file. - handle, - # Start time of the segment. - sclk_begtim, - # End time of the segment. - sclk_endtim, - # Pointing frame ID. - int(id_imap_dps), - # Reference frame. - "ECLIPJ2000", # Reference frame - # Identifier. - "IMAP_DPS", - # Number of pointing intervals. - 1, - # Start times of individual pointing records within segment. - # Since there is only a single record this is equal to sclk_begtim. - np.array([sclk_begtim]), - # End times of individual pointing records within segment. - # Since there is only a single record this is equal to sclk_endtim. - np.array([sclk_endtim]), # Single stop time - # Average quaternion. - q_avg, - # 0.0 Angular rotation terms. - np.array([0.0, 0.0, 0.0]), - # Rates (seconds per tick) at which the quaternion and - # angular velocity change. - np.array([1.0]), - ) + # Get IDs. + # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.gipool + id_imap_dps = spice.gipool("FRAME_IMAP_DPS", 0, 1) + id_imap_sclk = spice.gipool("CK_-43000_SCLK", 0, 1) - return pointing_frame_path + # Verify that only ck_path kernel is loaded. + count = spice.ktotal("ck") + loaded_ck_kernel, _, _, _ = spice.kdata(count - 1, "ck") + if count != 1 or str(ck_path) != loaded_ck_kernel: + raise ValueError(f"Error: Expected CK kernel {ck_path}") -@ensure_spice -def _get_et_times(ck_kernel: str) -> tuple[float, float, np.ndarray]: + # If the pointing frame kernel already exists, find the last time. + if pointing_frame_path.exists(): + # Get the last time in the pointing frame kernel. + pointing_cover = spice.ckcov( + str(pointing_frame_path), int(id_imap_dps), True, "SEGMENT", 0, "TDB" + ) + num_segments = spice.wncard(pointing_cover) + _, et_end_pointing_frame = spice.wnfetd(pointing_cover, num_segments - 1) + else: + et_end_pointing_frame = None + + # TODO: Query for .csv file to get the pointing start and end times. + # TODO: Remove next four lines once query is added. + id_imap_spacecraft = spice.gipool("FRAME_IMAP_SPACECRAFT", 0, 1) + ck_cover = spice.ckcov( + str(ck_path), int(id_imap_spacecraft), True, "INTERVAL", 0, "TDB" + ) + num_intervals = spice.wncard(ck_cover) + + with open_spice_ck_file(pointing_frame_path) as handle: + # TODO: this will change to the number of pointings. + for i in range(num_intervals): + # Get the coverage window + # TODO: this will change to pointing start and end time. + et_start, et_end = spice.wnfetd(ck_cover, i) + et_times = _get_et_times(et_start, et_end) + + # TODO: remove after query is added. + if ( + et_end_pointing_frame is not None + and et_times[0] < et_end_pointing_frame + ): + break + + # Create a rotation matrix + rotation_matrix = _create_rotation_matrix(et_times) + + # Convert the rotation matrix to a quaternion. + # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.m2q + q_avg = spice.m2q(rotation_matrix) + + # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.sce2c + # Convert start and end times to SCLK. + sclk_begtim = spice.sce2c(int(id_imap_sclk), et_times[0]) + sclk_endtim = spice.sce2c(int(id_imap_sclk), et_times[-1]) + + # Create the pointing frame kernel. + # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.ckw02 + spice.ckw02( + # Handle of an open CK file. + handle, + # Start time of the segment. + sclk_begtim, + # End time of the segment. + sclk_endtim, + # Pointing frame ID. + int(id_imap_dps), + # Reference frame. + "ECLIPJ2000", # Reference frame + # Identifier. + "IMAP_DPS", + # Number of pointing intervals. + 1, + # Start times of individual pointing records within segment. + # Since there is only a single record this is equal to sclk_begtim. + np.array([sclk_begtim]), + # End times of individual pointing records within segment. + # Since there is only a single record this is equal to sclk_endtim. + np.array([sclk_endtim]), # Single stop time + # Average quaternion. + q_avg, + # 0.0 Angular rotation terms. + np.array([0.0, 0.0, 0.0]), + # Rates (seconds per tick) at which the quaternion and + # angular velocity change. + np.array([1.0]), + ) + + +def _get_et_times(et_start: float, et_end: float) -> NDArray[np.float64]: """ Get times for pointing start and stop. Parameters ---------- - ck_kernel : str - Path of ck_kernel used to create the pointing frame. - - Returns - ------- et_start : float Pointing start time. et_end : float Pointing end time. + + Returns + ------- et_times : numpy.ndarray Array of times between et_start and et_end. """ - # Get the spacecraft ID. - # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.gipool - id_imap_spacecraft = spice.gipool("FRAME_IMAP_SPACECRAFT", 0, 1) - # TODO: Queried pointing start and stop times here. # TODO removing the @ensure_spice decorator when using the repointing table. - # Get the coverage window - # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.ckcov - cover = spice.ckcov(ck_kernel, int(id_imap_spacecraft), True, "SEGMENT", 0, "TDB") - # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.wnfetd - et_start, et_end = spice.wnfetd(cover, 0) - # 1 spin/15 seconds; 10 quaternions / spin + # 1 spin/15 seconds; 10 quaternions / spin. num_samples = (et_end - et_start) / 15 * 10 - et_times = np.linspace(et_start, et_end, int(num_samples)) + # There were rounding errors when using spice.pxform so np.ceil and np.floor + # were used to ensure the start and end times were included in the array. + et_times = np.linspace( + np.ceil(et_start * 1e6) / 1e6, np.floor(et_end * 1e6) / 1e6, int(num_samples) + ) - return et_start, et_end, et_times + return et_times @ensure_spice diff --git a/imap_processing/tests/spice/test_data/sim_1yr_imap_attitude.bc b/imap_processing/tests/spice/test_data/sim_1yr_imap_attitude.bc new file mode 100755 index 000000000..589f2f08f Binary files /dev/null and b/imap_processing/tests/spice/test_data/sim_1yr_imap_attitude.bc differ diff --git a/imap_processing/tests/spice/test_data/sim_1yr_imap_pointing_frame.bc b/imap_processing/tests/spice/test_data/sim_1yr_imap_pointing_frame.bc new file mode 100644 index 000000000..8ab7f76e3 Binary files /dev/null and b/imap_processing/tests/spice/test_data/sim_1yr_imap_pointing_frame.bc differ diff --git a/imap_processing/tests/spice/test_kernels.py b/imap_processing/tests/spice/test_kernels.py index 04b30281c..ed2a68b70 100644 --- a/imap_processing/tests/spice/test_kernels.py +++ b/imap_processing/tests/spice/test_kernels.py @@ -28,13 +28,29 @@ def pointing_frame_kernels(spice_test_data_path): return kernels +@pytest.fixture() +def multiple_pointing_kernels(spice_test_data_path): + """List SPICE kernels.""" + required_kernels = [ + "imap_science_0001.tf", + "imap_sclk_0000.tsc", + "sim_1yr_imap_attitude.bc", + "imap_wkcp.tf", + "naif0012.tls", + ] + kernels = [str(spice_test_data_path / kernel) for kernel in required_kernels] + return kernels + + @pytest.fixture() def et_times(pointing_frame_kernels): """Tests get_et_times function.""" spice.furnsh(pointing_frame_kernels) - file, _, _, _ = spice.kdata(0, "ck") - et_start, et_end, et_times = _get_et_times(file) + ck_kernel, _, _, _ = spice.kdata(0, "ck") + ck_cover = spice.ckcov(ck_kernel, -43000, True, "INTERVAL", 0, "TDB") + et_start, et_end = spice.wnfetd(ck_cover, 0) + et_times = _get_et_times(et_start, et_end) return et_times @@ -129,19 +145,23 @@ def test_create_rotation_matrix(et_times, pointing_frame_kernels): np.testing.assert_allclose(rotation_matrix, rotation_matrix_expected, atol=1e-4) -def test_create_pointing_frame(spice_test_data_path, pointing_frame_kernels, tmp_path): +def test_create_pointing_frame( + spice_test_data_path, pointing_frame_kernels, tmp_path, et_times +): """Tests create_pointing_frame function.""" + spice.kclear() spice.furnsh(pointing_frame_kernels) - ck_kernel, _, _, _ = spice.kdata(0, "ck") - et_start, et_end, et_times = _get_et_times(ck_kernel) - create_pointing_frame(pointing_frame_path=tmp_path / "imap_dps.bc") + create_pointing_frame( + pointing_frame_path=tmp_path / "imap_dps.bc", + ck_path=spice_test_data_path / "imap_sim_ck_2hr_2secsampling_with_nutation.bc", + ) # After imap_dps.bc has been created. dps_kernel = str(tmp_path / "imap_dps.bc") spice.furnsh(dps_kernel) - rotation_matrix_1 = spice.pxform("ECLIPJ2000", "IMAP_DPS", et_start + 100) - rotation_matrix_2 = spice.pxform("ECLIPJ2000", "IMAP_DPS", et_start + 1000) + rotation_matrix_1 = spice.pxform("ECLIPJ2000", "IMAP_DPS", et_times[0] + 100) + rotation_matrix_2 = spice.pxform("ECLIPJ2000", "IMAP_DPS", et_times[0] + 1000) # All the rotation matrices should be the same. assert np.array_equal(rotation_matrix_1, rotation_matrix_2) @@ -155,18 +175,96 @@ def test_create_pointing_frame(spice_test_data_path, pointing_frame_kernels, tmp # Verify imap_dps.bc has been created. assert (tmp_path / "imap_dps.bc").exists() + # Tests error handling when incorrect kernel is loaded. + spice.furnsh(pointing_frame_kernels) + with pytest.raises( + ValueError, match="Error: Expected CK kernel badname_kernel.bc" + ): # Replace match string with expected error message + create_pointing_frame( + pointing_frame_path=tmp_path / "imap_dps.bc", ck_path="badname_kernel.bc" + ) + -@ensure_spice def test_et_times(pointing_frame_kernels): """Tests get_et_times function.""" spice.furnsh(pointing_frame_kernels) - file, _, _, _ = spice.kdata(0, "ck") - et_start, et_end, et_times = _get_et_times(file) + ck_kernel, _, _, _ = spice.kdata(0, "ck") + ck_cover = spice.ckcov(ck_kernel, -43000, True, "INTERVAL", 0, "TDB") + et_start, et_end = spice.wnfetd(ck_cover, 0) + et_times = _get_et_times(et_start, et_end) - assert et_start == 802008069.184905 - assert et_end == 802015267.184906 assert et_times[0] == et_start assert et_times[-1] == et_end return et_times + + +def test_multiple_attempts(pointing_frame_kernels, tmp_path, spice_test_data_path): + """Tests create_pointing_frame function with multiple pointing kernels.""" + spice.furnsh(pointing_frame_kernels) + + # Check that a single segment is added regardless of how many times + # create_pointing_frame is called. + create_pointing_frame( + pointing_frame_path=tmp_path / "imap_dps.bc", + ck_path=spice_test_data_path / "imap_sim_ck_2hr_2secsampling_with_nutation.bc", + ) + ck_cover = spice.ckcov( + str(tmp_path / "imap_dps.bc"), -43901, True, "INTERVAL", 0, "TDB" + ) + num_intervals = spice.wncard(ck_cover) + assert num_intervals == 1 + + create_pointing_frame( + pointing_frame_path=tmp_path / "imap_dps.bc", + ck_path=spice_test_data_path / "imap_sim_ck_2hr_2secsampling_with_nutation.bc", + ) + ck_cover = spice.ckcov( + str(tmp_path / "imap_dps.bc"), -43901, True, "INTERVAL", 0, "TDB" + ) + num_intervals = spice.wncard(ck_cover) + assert num_intervals == 1 + + +def test_multiple_pointings(pointing_frame_kernels, spice_test_data_path, tmp_path): + """Tests create_pointing_frame function with multiple pointing kernels.""" + spice.furnsh(pointing_frame_kernels) + + create_pointing_frame( + pointing_frame_path=tmp_path / "imap_pointing_frame.bc", + ck_path=spice_test_data_path / "imap_sim_ck_2hr_2secsampling_with_nutation.bc", + ) + ck_cover_pointing = spice.ckcov( + str(tmp_path / "imap_pointing_frame.bc"), + -43901, + True, + "INTERVAL", + 0, + "TDB", + ) + num_intervals = spice.wncard(ck_cover_pointing) + et_start_pointing, et_end_pointing = spice.wnfetd(ck_cover_pointing, 0) + + ck_cover = spice.ckcov( + str(spice_test_data_path / "imap_sim_ck_2hr_2secsampling_with_nutation.bc"), + -43000, + True, + "INTERVAL", + 0, + "TDB", + ) + num_intervals_expected = spice.wncard(ck_cover) + et_start_expected, et_end_expected = spice.wnfetd(ck_cover, 0) + + assert num_intervals == num_intervals_expected + assert et_start_pointing == et_start_expected + assert et_end_pointing == et_end_expected + + et_times = _get_et_times(et_start_pointing, et_end_pointing) + + spice.furnsh(str(tmp_path / "imap_pointing_frame.bc")) + rotation_matrix_1 = spice.pxform("ECLIPJ2000", "IMAP_DPS", et_times[100]) + rotation_matrix_2 = spice.pxform("ECLIPJ2000", "IMAP_DPS", et_times[1000]) + + assert np.array_equal(rotation_matrix_1, rotation_matrix_2)