From 64b0a4c1836af82203483a4dec5896224d2aa1de Mon Sep 17 00:00:00 2001 From: Laura Sandoval Date: Thu, 22 Aug 2024 12:57:29 -0600 Subject: [PATCH] move to kernels.py --- imap_processing/spice/kernels.py | 214 ++++++++++++++++++++ imap_processing/tests/spice/test_kernels.py | 101 +++++++++ 2 files changed, 315 insertions(+) diff --git a/imap_processing/spice/kernels.py b/imap_processing/spice/kernels.py index 2d4e21154..aca6cce21 100644 --- a/imap_processing/spice/kernels.py +++ b/imap_processing/spice/kernels.py @@ -3,9 +3,12 @@ import functools import logging import os +from pathlib import Path from typing import Any, Callable, Optional +import numpy as np import spiceypy as spice +from numpy.typing import NDArray from spiceypy.utils.exceptions import SpiceyError logger = logging.getLogger(__name__) @@ -156,3 +159,214 @@ def wrapper_ensure_spice(*args: Any, **kwargs: Any) -> Any: return _decorator(f_py) else: return _decorator + + +@ensure_spice +def create_pointing_frame() -> Path: + """ + Create the pointing frame. + + Returns + ------- + path_to_pointing_frame : Path + Path to dps frame. + """ + # TODO: this part will change with ensure_spice decorator. + # Mount path to EFS. + mount_path = Path(os.getenv("EFS_MOUNT_PATH", "")) + + # TODO: this part will change with ensure_spice decorator. + kernels = [str(file) for file in mount_path.iterdir()] + ck_kernel = [str(file) for file in mount_path.iterdir() if file.suffix == ".bc"] + + # Furnish the kernels. + with spice.KernelPool(kernels): + # Get timerange for the pointing frame kernel. + et_start, et_end, et_times = _get_et_times(str(ck_kernel[0])) + # Create a rotation matrix + rotation_matrix = _create_rotation_matrix(et_times) + + # Convert the rotation matrix to a quaternion. + # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.m2q + q_avg = spice.m2q(rotation_matrix) + + # TODO: come up with naming convention. + path_to_pointing_frame = mount_path / "imap_dps.bc" + + # Open a new CK file, returning the handle of the opened file. + # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.ckopn + handle = spice.ckopn(str(path_to_pointing_frame), "CK", 0) + # Get the SCLK ID. + # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.gipool + id_imap_sclk = spice.gipool("CK_-43000_SCLK", 0, 1) + # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.sce2c + # Convert start and end times to SCLK. + sclk_begtim = spice.sce2c(int(id_imap_sclk), et_start) + sclk_endtim = spice.sce2c(int(id_imap_sclk), et_end) + + # Get the pointing frame ID. + # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.gipool + id_imap_dps = spice.gipool("FRAME_IMAP_DPS", 0, 1) + + # Create the pointing frame kernel. + # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.ckw02 + spice.ckw02( + # Handle of an open CK file. + handle, + # Start time of the segment. + sclk_begtim, + # End time of the segment. + sclk_endtim, + # Pointing frame ID. + int(id_imap_dps), + # Reference frame. + "ECLIPJ2000", # Reference frame + # Identifier. + "IMAP_DPS", + # Number of pointing intervals. + 1, + # Start times of individual pointing records within segment. + # Since there is only a single record this is equal to sclk_begtim. + np.array([sclk_begtim]), + # End times of individual pointing records within segment. + # Since there is only a single record this is equal to sclk_endtim. + np.array([sclk_endtim]), # Single stop time + # Average quaternion. + q_avg, + # 0.0 Angular rotation terms. + np.array([0.0, 0.0, 0.0]), + # Rates (seconds per tick) at which the quaternion and + # angular velocity change. + np.array([1.0]), + ) + + # Close CK file. + spice.ckcls(handle) + + return path_to_pointing_frame + + +@ensure_spice +def _get_et_times(ck_kernel: str) -> tuple[float, float, np.ndarray]: + """ + Get times for pointing start and stop. + + Parameters + ---------- + ck_kernel : str + Path of ck_kernel used to create the pointing frame. + + Returns + ------- + et_start : float + Pointing start time. + et_end : float + Pointing end time. + et_times : numpy.ndarray + Array of times between et_start and et_end. + """ + # Get the spacecraft ID. + # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.gipool + id_imap_spacecraft = spice.gipool("FRAME_IMAP_SPACECRAFT", 0, 1) + + # TODO: Queried pointing start and stop times here. + + # Get the coverage window + # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.ckcov + cover = spice.ckcov(ck_kernel, int(id_imap_spacecraft), True, "SEGMENT", 0, "TDB") + # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.wnfetd + et_start, et_end = spice.wnfetd(cover, 0) + # 1 spin/15 seconds; 10 quaternions / spin + num_samples = (et_end - et_start) / 15 * 10 + et_times = np.linspace(et_start, et_end, int(num_samples)) + + return et_start, et_end, et_times + + +@ensure_spice +def _average_quaternions(et_times: np.ndarray) -> NDArray: + """ + Average the quaternions. + + Parameters + ---------- + et_times : numpy.ndarray + Array of times between et_start and et_end. + + Returns + ------- + q_avg : np.ndarray + Average quaternion. + """ + aggregate = np.zeros((4, 4)) + for tdb in et_times: + # we use a quick and dirty method here for grabbing the quaternions + # from the attitude kernel. Depending on how well the kernel input + # data is built and sampled, there may or may not be aliasing with this + # approach. If it turns out that we need to pull the quaternions + # directly from the CK there are several routines that exist to do this + # but it's not straight forward. We'll revisit this if needed. + + # Rotation matrix from IMAP spacecraft frame to ECLIPJ2000. + # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.pxform + body_rots = spice.pxform("IMAP_SPACECRAFT", "ECLIPJ2000", tdb) + # Convert rotation matrix to quaternion. + # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.m2q + body_quat = spice.m2q(body_rots) + + # Standardize the quaternion so that they may be compared. + body_quat = body_quat * np.sign(body_quat[0]) + # Aggregate quaternions into a single matrix. + aggregate += np.outer(body_quat, body_quat) + + # Reference: "On Averaging Rotations" + # Link: https://link.springer.com/content/pdf/10.1023/A:1011129215388.pdf + aggregate /= len(et_times) + + # Compute eigen values and vectors of the matrix A + # Eigenvalues tell you how much "influence" each + # direction (eigenvector) has. + # The largest eigenvalue corresponds to the direction + # that has the most influence. + # The eigenvector corresponding to the largest + # eigenvalue points in the direction that has the most + # combined rotation influence. + eigvals, eigvecs = np.linalg.eig(aggregate) + # q0: The scalar part of the quaternion. + # q1, q2, q3: The vector part of the quaternion. + q_avg = eigvecs[:, np.argmax(eigvals)] + + return q_avg + + +@ensure_spice +def _create_rotation_matrix(et_times: np.ndarray) -> NDArray: + """ + Create a rotation matrix. + + Parameters + ---------- + et_times : numpy.ndarray + Array of times between et_start and et_end. + + Returns + ------- + rotation_matrix : np.ndarray + Rotation matrix. + """ + # Averaged quaternions. + q_avg = _average_quaternions(et_times) + + # Converts the averaged quaternion (q_avg) into a rotation matrix + # and get inertial z axis. + # https://spiceypy.readthedocs.io/en/main/documentation.html#spiceypy.spiceypy.q2m + z_avg = spice.q2m(list(q_avg))[:, 2] + # y_avg is perpendicular to both z_avg and the standard Z-axis. + y_avg = np.cross(z_avg, [0, 0, 1]) + # x_avg is perpendicular to y_avg and z_avg. + x_avg = np.cross(y_avg, z_avg) + + # Construct the rotation matrix from x_avg, y_avg, z_avg + rotation_matrix = np.asarray([x_avg, y_avg, z_avg]) + + return rotation_matrix diff --git a/imap_processing/tests/spice/test_kernels.py b/imap_processing/tests/spice/test_kernels.py index c8ce926ad..29b6ec675 100644 --- a/imap_processing/tests/spice/test_kernels.py +++ b/imap_processing/tests/spice/test_kernels.py @@ -1,10 +1,19 @@ """Tests coverage for imap_processing/spice/kernels.py""" +import os + +import numpy as np import pytest import spiceypy as spice from spiceypy.utils.exceptions import SpiceyError from imap_processing.spice import kernels +from imap_processing.spice.kernels import ( + _average_quaternions, + _create_rotation_matrix, + _get_et_times, + create_pointing_frame, +) @kernels.ensure_spice @@ -67,3 +76,95 @@ def test_ensure_spice_key_error(): # functions that it decorates. with pytest.raises(SpiceyError): _ = wrapped(577365941.184, "ISOC", 3) == "2018-04-18T23:24:31.998" + + +@pytest.fixture() +def kernels(spice_test_data_path): + """Create kernel list.""" + kernels = [str(file) for file in spice_test_data_path.iterdir()] + + return kernels + + +@pytest.fixture() +def ck_kernel(spice_test_data_path): + """Location of ck kernel to create pointing kernel from.""" + return spice_test_data_path / "imap_spin.bc" + + +@pytest.fixture() +def et_times(ck_kernel, kernels): + """Tests get_et_times function.""" + + with spice.KernelPool(kernels): + et_start, et_end, et_times = _get_et_times(str(ck_kernel)) + + return et_times + + +def test_get_et_times(kernels, ck_kernel): + """Tests get_et_times function.""" + + with spice.KernelPool(kernels): + et_start, et_end, et_times = _get_et_times(str(ck_kernel)) + + assert et_start == 802008069.184905 + assert et_end == 802015267.184906 + assert et_times[0] == et_start + assert et_times[-1] == et_end + + +def test_average_quaternions(et_times, kernels): + """Tests average_quaternions function.""" + + with spice.KernelPool(kernels): + q_avg = _average_quaternions(et_times) + + # Generated from MATLAB code results + q_avg_expected = np.array([-0.6611, 0.4981, -0.5019, -0.2509]) + np.testing.assert_allclose(q_avg, q_avg_expected, atol=1e-4) + + +def test_create_rotation_matrix(et_times, kernels): + """Tests create_rotation_matrix function.""" + + with spice.KernelPool(kernels): + rotation_matrix = _create_rotation_matrix(et_times) + q_avg = _average_quaternions(et_times) + z_avg = spice.q2m(list(q_avg))[:, 2] + + rotation_matrix_expected = np.array( + [[0.0000, 0.0000, 1.0000], [0.9104, -0.4136, 0.0000], [0.4136, 0.9104, 0.0000]] + ) + z_avg_expected = np.array([0.4136, 0.9104, 0.0000]) + + np.testing.assert_allclose(z_avg, z_avg_expected, atol=1e-4) + np.testing.assert_allclose(rotation_matrix, rotation_matrix_expected, atol=1e-4) + + +def test_create_pointing_frame(monkeypatch, spice_test_data_path, ck_kernel, tmp_path): + """Tests create_pointing_frame function.""" + monkeypatch.setenv("EFS_MOUNT_PATH", str(spice_test_data_path)) + create_pointing_frame() + + # After imap_dps.bc has been created. + kernels = [str(file) for file in spice_test_data_path.iterdir()] + + with spice.KernelPool(kernels): + et_start, et_end, et_times = _get_et_times(str(ck_kernel)) + + rotation_matrix_1 = spice.pxform("ECLIPJ2000", "IMAP_DPS", et_start + 100) + rotation_matrix_2 = spice.pxform("ECLIPJ2000", "IMAP_DPS", et_start + 1000) + + # All the rotation matrices should be the same. + assert np.array_equal(rotation_matrix_1, rotation_matrix_2) + + # Nick Dutton's MATLAB code result + rotation_matrix_expected = np.array( + [[0.0000, 0.0000, 1.0000], [0.9104, -0.4136, 0.0000], [0.4136, 0.9104, 0.0000]] + ) + np.testing.assert_allclose(rotation_matrix_1, rotation_matrix_expected, atol=1e-4) + + # Verify imap_dps.bc has been created. + assert (spice_test_data_path / "imap_dps.bc").exists() + os.remove(spice_test_data_path / "imap_dps.bc")