Source code for adam_core.orbits.spice_kernel

import logging
from typing import Dict, Optional

import numpy as np
import quivr as qv
import spiceypy as sp
from astropy import units as u
from naif_de440 import de440
from naif_earth_itrf93 import earth_itrf93
from naif_eop_high_prec import eop_high_prec
from naif_eop_historical import eop_historical
from naif_eop_predict import eop_predict
from naif_leapseconds import leapseconds

from ..coordinates.cartesian import CartesianCoordinates
from ..coordinates.origin import OriginCodes
from ..coordinates.transform import transform_coordinates
from ..orbits import Orbits
from ..propagator import Propagator
from ..time import Timestamp
from ..utils.spice import setup_SPICE

DEFAULT_KERNELS = [
    leapseconds,
    de440,
    eop_predict,
    eop_historical,
    eop_high_prec,
    earth_itrf93,
]

# Add after DEFAULT_KERNELS
logger = logging.getLogger(__name__)

J2000_TDB_JD = 2451545.0


[docs] def fit_chebyshev( coordinates: CartesianCoordinates, window_start: float, window_end: float, degree: int, ) -> tuple[np.ndarray, float, float]: """ Fit Chebyshev polynomials to position and velocity data. Parameters ---------- coordinates : CartesianCoordinates Coordinates to fit window_start : float Start time in ET seconds window_end : float End time in ET seconds degree : int Degree of Chebyshev polynomials Returns ------- tuple (coefficients, mid_time, half_interval) coefficients has shape (6, degree+1) """ # Get states for this window and convert to km and km/s et_times = coordinates.time.et().to_numpy() mask = (et_times >= window_start) & (et_times <= window_end) states = coordinates.values[mask].copy() states[:, :3] *= 149597870.7 # au to km states[:, 3:] *= 149597870.7 / 86400.0 # au/day to km/s times = coordinates.time.et().to_numpy()[mask] # Scale time to [-1, 1] interval mid_time = (window_end + window_start) / 2 half_interval = (window_end - window_start) / 2 scaled_times = (times - mid_time) / half_interval # Compute Chebyshev polynomials T = np.zeros((len(times), degree + 1)) T[:, 0] = 1 T[:, 1] = scaled_times for i in range(2, degree + 1): T[:, i] = 2 * scaled_times * T[:, i - 1] - T[:, i - 2] # Fit polynomials to each component coefficients = np.zeros((6, degree + 1)) for i in range(6): coefficients[i] = np.linalg.lstsq(T, states[:, i], rcond=None)[0] return coefficients, mid_time, half_interval
[docs] def orbits_to_spk( orbits: Orbits, output_file: str, start_time: Timestamp, end_time: Timestamp, propagator: Optional[Propagator] = None, max_processes: Optional[int] = None, step_days: float = 1.0 / 4, # Every 6 hours target_id_start: int = 1000000, window_days: float = 32.0, comment: str = "SPK file generated from adam_core Orbits", kernel_type: str = "w03", ) -> Dict[str, int]: """ Convert Orbits object to a SPICE SPK file using Chebyshev polynomials (Type 3). """ logger.info(f"Creating SPK file: {output_file}") logger.info( f"Time range: {start_time.to_astropy().isot} to {end_time.to_astropy().isot}" ) logger.info(f"Kernel type: {kernel_type}") # ensure SPICE is ready to go setup_SPICE() # default to a chebyshev degree of 15 cheby_degree = 15 # Generate propagation times start_mjd = start_time.mjd().to_numpy(zero_copy_only=False).item() end_mjd = end_time.mjd().to_numpy(zero_copy_only=False).item() num_steps = int((end_mjd - start_mjd) / step_days) + 1 logger.debug(f"Generated {num_steps} time steps with step size {step_days} days") times = qv.concatenate( [start_time.add_fractional_days(i * step_days) for i in range(num_steps)] ) # Propagate orbits if propagator provided if propagator is not None: logger.debug("Propagating orbits...") orbits = propagator.propagate_orbits( orbits, times, max_processes=max_processes, chunk_size=1 ) logger.debug("Orbit propagation complete") # Transform everything to a Sun origin and # ecliptic frame # Verify all orbits have the same origin ssb_coordinates = transform_coordinates( orbits.coordinates, frame_out="equatorial", origin_out=OriginCodes.SOLAR_SYSTEM_BARYCENTER, ) orbits = orbits.set_column("coordinates", ssb_coordinates) # Create the SPK file handle = sp.spkopn(output_file, comment, 0) target_id_mappings = {} for i, (orbit_id, orbit) in enumerate(orbits.group_by_orbit_id()): logger.debug( f"Processing orbit {orbit_id} ({i} / ({len(orbits.orbit_id.unique())})" ) # ensure orbit is sorted by time orbit = orbit.sort_by(["coordinates.time.days", "coordinates.time.nanos"]) target_id = target_id_start + i target_id_mappings[orbit_id] = target_id logger.debug(f"Orbit {orbit_id} -> Target ID: {target_id}") # Get time range for this orbit orbit_start = orbit.coordinates.time.min().et()[0].as_py() orbit_end = orbit.coordinates.time.max().et()[0].as_py() if kernel_type == "w03": write_spkw03_segment( orbit, handle, target_id, orbit_start, orbit_end, window_days * 86400.0, cheby_degree, ) elif kernel_type == "w09": write_spkw09_segment( orbit, handle, target_id, orbit_start, orbit_end, ) else: raise ValueError(f"Invalid kernel type: {kernel_type}") # Close the SPK file sp.spkcls(handle) logger.info(f"Successfully created SPK file: {output_file}") return target_id_mappings
[docs] def write_spkw03_segment( propagated_orbit: Orbits, handle: int, target_id: int, start_time: float, end_time: float, window_seconds: float = 86400.0, cheby_degree: int = 15, ) -> None: logger.debug(f"Writing SPK type 03 segment for target ID {target_id}") # assert orbit id is unique assert propagated_orbit.orbit_id.unique() spk_frame = ( "J2000" if propagated_orbit.coordinates.frame == "equatorial" else "ECLIPJ2000" ) segment_id = ( str(propagated_orbit.orbit_id[0].as_py()) + "_" + str(start_time) + "_" + str(end_time) ) # SPICE has a limit of 40 characters for the segment id segment_id = segment_id[:40] num_windows = int(np.ceil((end_time - start_time) / window_seconds)) logger.debug(f"Fitting {num_windows} Chebyshev windows") # Initialize arrays for SPKW03 cheby_coeffs = [] window_starts = [] for w in range(num_windows): start_time_window = start_time + w * window_seconds end_time_window = min(start_time_window + window_seconds, end_time) coeffs, mid_time, half_interval = fit_chebyshev( propagated_orbit.coordinates, start_time_window, end_time_window, cheby_degree, ) cheby_coeffs.append(coeffs.flatten()) window_starts.append(start_time) # Convert to numpy arrays cheby_coeffs = np.array(cheby_coeffs) window_starts = np.array(window_starts) # Write the SPKW03 segment sp.spkw03( handle, target_id, propagated_orbit.coordinates.origin.as_OriginCodes().value, spk_frame, start_time, end_time, segment_id, window_seconds, len(cheby_coeffs), cheby_degree, cheby_coeffs.flatten(), window_starts[0], )
[docs] def write_spkw09_segment( propagated_orbit: Orbits, handle: int, target_id: int, start_time: float, end_time: float, ) -> None: logger.debug(f"Writing SPK type 09 segment for target ID {target_id}") # assert orbit id is unique assert propagated_orbit.orbit_id.unique() spk_frame = ( "J2000" if propagated_orbit.coordinates.frame == "equatorial" else "ECLIPJ2000" ) segment_id = ( str(propagated_orbit.orbit_id[0].as_py()) + "_" + str(start_time) + "_" + str(end_time) ) # SPICE has a limit of 40 characters for the segment id segment_id = segment_id[:40] epochs_tdb = ( propagated_orbit.coordinates.time.rescale("tdb") .jd() .to_numpy(zero_copy_only=False) ) epochs_et = np.array([sp.str2et(f"JD {i:.15f} TDB".format(i)) for i in epochs_tdb]) states = propagated_orbit.coordinates.values states[:, 0:6] *= u.au.to(u.km) states[:, 3:6] /= (u.d).to(u.s) sp.spkw09( handle, target_id, propagated_orbit.coordinates.origin.as_OriginCodes().value, spk_frame, epochs_et[0], epochs_et[-1], segment_id, 15, len(epochs_et), np.ascontiguousarray(states), epochs_et, )