Skip to content

Commit

Permalink
Remove _ensured_spkezr funciton.
Browse files Browse the repository at this point in the history
Remove SpiceId(NamedTuple)
Parameterize test to cover single and iterable ET input
Change typing on ensure_spice to apease mypy
Fix up ensure_spice decorator factory typing
  • Loading branch information
subagonsouth committed Sep 4, 2024
1 parent f3ec1af commit 0b87688
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 120 deletions.
127 changes: 43 additions & 84 deletions imap_processing/spice/geometry.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,66 @@
"""Functions for computing geometry, many of which use SPICE."""
"""
Functions for computing geometry, many of which use SPICE.
Paradigms for developing this module:
* Use @ensure_spice decorator on functions that directly wrap spiceypy functions
* Vectorize everything at the lowest level possible (e.g. the decorated spiceypy
wrapper function)
* Always return numpy arrays for vectorized calls.
"""

from collections.abc import Iterable
from enum import Enum
from typing import NamedTuple, Optional, Union
from typing import Union

import numpy as np
import spiceypy as spice

from imap_processing.spice.kernels import ensure_spice


class SpiceId(NamedTuple):
"""Class that represents a unique identifier in the NAIF SPICE library."""

strid: str
numid: int


class SpiceBody(Enum):
"""Enum containing SPICE IDs for bodies that we use."""

# A subset of IMAP Specific bodies as defined in imap_wkcp.tf
IMAP = SpiceId("IMAP", -43)
IMAP_SPACECRAFT = SpiceId("IMAP_SPACECRAFT", -43000)
IMAP = -43
IMAP_SPACECRAFT = -43000
# IMAP Pointing Frame (Despun) as defined in iamp_science_0001.tf
IMAP_DPS = SpiceId("IMAP_DPS", -43901)
IMAP_DPS = -43901
# Standard NAIF bodies
SSB = SpiceId("SOLAR_SYSTEM_BARYCENTER", 0)
SUN = SpiceId("SUN", 10)
EARTH = SpiceId("EARTH", 399)
SOLAR_SYSTEM_BARYCENTER = 0
SUN = 10
EARTH = 399


class SpiceFrame(Enum):
"""Enum containing SPICE IDs for reference frames, defined in imap_wkcp.tf."""

# Standard SPICE Frames
J2000 = SpiceId("J2000", 1)
ECLIPJ2000 = SpiceId("ECLIPJ2000", 17)
# IMAP specific
IMAP_SPACECRAFT = SpiceId("IMAP_SPACECRAFT", -43000)
IMAP_LO_BASE = SpiceId("IMAP_LO_BASE", -43100)
IMAP_LO_STAR_SENSOR = SpiceId("IMAP_LO_STAR_SENSOR", -43103)
IMAP_LO = SpiceId("IMAP_LO", -43105)
IMAP_HI_45 = SpiceId("IMAP_HI_45", -43150)
IMAP_HI_90 = SpiceId("IMAP_HI_90", -43160)
IMAP_ULTRA_45 = SpiceId("IMAP_ULTRA_45", -43200)
IMAP_ULTRA_90 = SpiceId("IMAP_ULTRA_90", -43210)
IMAP_MAG = SpiceId("IMAP_MAG", -43250)
IMAP_SWE = SpiceId("IMAP_SWE", -43300)
IMAP_SWAPI = SpiceId("IMAP_SWAPI", -43350)
IMAP_CODICE = SpiceId("IMAP_CODICE", -43400)
IMAP_HIT = SpiceId("IMAP_HIT", -43500)
IMAP_IDEX = SpiceId("IMAP_IDEX", -43700)
IMAP_GLOWS = SpiceId("IMAP_GLOWS", -43750)


J2000 = 1
ECLIPJ2000 = 17
# IMAP specific as defined in imap_wkcp.tf
IMAP_SPACECRAFT = -43000
IMAP_LO_BASE = -43100
IMAP_LO_STAR_SENSOR = -43103
IMAP_LO = -43105
IMAP_HI_45 = -43150
IMAP_HI_90 = -43160
IMAP_ULTRA_45 = -43200
IMAP_ULTRA_90 = -43210
IMAP_MAG = -43250
IMAP_SWE = -43300
IMAP_SWAPI = -43350
IMAP_CODICE = -43400
IMAP_HIT = -43500
IMAP_IDEX = -43700
IMAP_GLOWS = -43750


@ensure_spice
def imap_state(
et: Union[np.ndarray, float],
ref_frame: Optional[SpiceFrame] = None,
observer: Optional[SpiceBody] = None,
) -> Union[np.ndarray, Iterable[np.ndarray]]:
ref_frame: SpiceFrame = SpiceFrame.ECLIPJ2000,
observer: SpiceBody = SpiceBody.SUN,
) -> np.ndarray:
"""
Get the state (position and velocity) of the IMPA spacecraft.
Expand All @@ -76,53 +77,11 @@ def imap_state(
Returns
-------
state : np.ndarray or Iterable[np.ndarray]
state : np.ndarray
The Cartesian state vector representing the position and velocity of the
IMAP spacecraft.
"""
if ref_frame is None:
ref_frame = SpiceFrame.ECLIPJ2000
if observer is None:
observer = SpiceBody.SUN
state, light_time = ensured_spkezr(
state, _ = spice.spkezr(
SpiceBody.IMAP.name, et, ref_frame.name, "NONE", observer.name
)
return state


def ensured_spkezr(
targ: str, et: Union[np.ndarray, float], ref: str, abcorr: str, obs: str
) -> Union[tuple[np.ndarray, float], tuple[Iterable[np.ndarray], Iterable[float]]]:
"""
Wrap spice.spkezr() function with ensure_spice.
Parameters
----------
targ : str
Target body name.
et : ndarray or float
J2000 observer times.
ref : str
Reference frame name.
abcorr : str
Aberration correction method.
obs : str
Observing body name.
Returns
-------
state : np.ndarray or Iterable[np.ndarray]
State of target.
light_time : float or Iterable[float]
One way light time between observer and target.
Notes
-----
https://naif.jpl.nasa.gov/pub/naif/toolkit_docs/FORTRAN/spicelib/spkezr.html
"""
# No vectorization is needed b/c spiceypy already adds vectorization to the
# spkezr function. If specific time coverage functionality is added to
# @ensure_spice, parameters can be added here.
ensured = ensure_spice(spice.spkezr)
state, light_time = ensured(targ, et, ref, abcorr, obs)
return state, light_time
return np.asarray(state)
34 changes: 21 additions & 13 deletions imap_processing/spice/kernels.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,38 @@
from collections.abc import Generator
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Callable, Optional
from typing import Callable, Optional, ParamSpec, TypeVar, overload

import numpy as np
import spiceypy as spice
from numpy.typing import NDArray
from spiceypy.utils.exceptions import SpiceyError

P = ParamSpec("P")
T = TypeVar("T")
logger = logging.getLogger(__name__)


# What is going on here? Taken from mypy help with declaring decorators
# https://mypy.readthedocs.io/en/stable/generics.html#decorator-factories
# Bare decorator usage
@overload
def ensure_spice(__func: Callable[P, T]) -> Callable[P, T]: ... # numpydoc ignore=GL08
# Decorator with arguments
@overload
def ensure_spice(
f_py: Optional[Callable] = None, time_kernels_only: bool = False
) -> Callable:
*, time_kernels_only: bool = False
) -> Callable[[Callable[P, T]], Callable[P, T]]: ... # numpydoc ignore=GL08
# Implementation
def ensure_spice(
__func: Optional[Callable[P, T]] = None, *, time_kernels_only: bool = False
) -> Callable[P, T] | Callable[[Callable[P, T]], Callable[P, T]]:
"""
Decorator/wrapper that automatically furnishes SPICE kernels.
Parameters
----------
f_py : Callable
__func : Callable
The function requiring SPICE that we are going to wrap if being used
explicitly, otherwise None, in which case ensure_spice is being used,
not as a function wrapper (see l2a_processing.py) but as a true
Expand Down Expand Up @@ -82,13 +95,8 @@ def ensure_spice(
>>> wrapped = ensure_spice(spicey_func, time_kernels_only=True)
... result = wrapped(*args, **kwargs)
"""
if f_py and not callable(f_py):
raise ValueError(
f"Received a non-callable object {f_py} as the f_py argument to"
f"ensure_spice. f_py must be a callable object."
)

def _decorator(func: Callable[..., Callable]) -> Callable:
def _decorator(func: Callable[P, T]) -> Callable[P, T]:
"""
Decorate or wrap input function depending on how ensure_spice is used.
Expand All @@ -104,7 +112,7 @@ def _decorator(func: Callable[..., Callable]) -> Callable:
"""

@functools.wraps(func)
def wrapper_ensure_spice(*args: Any, **kwargs: Any) -> Any:
def wrapper_ensure_spice(*args: P.args, **kwargs: P.kwargs) -> T:
"""
Wrap the function that ensure_spice is used on.
Expand Down Expand Up @@ -157,8 +165,8 @@ def wrapper_ensure_spice(*args: Any, **kwargs: Any) -> Any:
# Note: This return was originally implemented as a ternary operator, but
# this caused mypy to fail due to this bug:
# https://github.com/python/mypy/issues/4134
if callable(f_py):
return _decorator(f_py)
if callable(__func):
return _decorator(__func)
else:
return _decorator

Expand Down
36 changes: 13 additions & 23 deletions imap_processing/tests/spice/test_geometry.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,25 @@
"""Tests coverage for imap_processing/spice/geometry.py"""

import numpy as np
import pytest

from imap_processing.spice.geometry import (
SpiceBody,
SpiceFrame,
ensured_spkezr,
imap_state,
)


def test_imap_state(use_test_metakernel):
@pytest.mark.parametrize(
"et",
[
798033670,
np.linspace(798033670, 798033770),
],
)
def test_imap_state(et, use_test_metakernel):
"""Test coverage for imap_state()"""
et = np.linspace(798033670, 798034670)
state = imap_state(et, observer=SpiceBody.EARTH)
assert len(state) == len(et)


def test_ensured_spkezr(use_test_metakernel):
"""Test coverage for ensured_spkezr()"""
# The imap_spk_demo.bsp kernel provides ephemeris coverage for 2025-04-15 to
# 2026-04-16. The kernel provides the IMAP ephemeris relative to Earth, so
# only the position relative to Earth can be queried without loading
# additional kernels.
# The queried ET, 798033670 is ~2025-04-16T00:00:00.0
state, lt = ensured_spkezr(
SpiceBody.IMAP.name,
798033670,
SpiceFrame.ECLIPJ2000.name,
"NONE",
SpiceBody.EARTH.name,
)
assert state.shape == (6,)
assert isinstance(lt, float)
if hasattr(et, "__len__"):
np.testing.assert_array_equal(state.shape, (len(et), 6))
else:
assert state.shape == (6,)

0 comments on commit 0b87688

Please sign in to comment.