Source code for adam_core.dynamics.moid

"""
Implementation of SGD-MOID

References
----------
[1] Hedo, J. M. et al. (2019). Minimum orbital intersection distance: an asymptotic approach.
    Astronomy & Astrophysics, 633, A22. https://doi.org/10.1051/0004-6361/201936502

Algorithm 1:
From an arbitrary point P_0 on the primary ellipse,
project the point onto the plane of the other ellipse.

Then find the minimum distance from the projected point,
using a minimization function.

Algorithm 2:

1. Divide up the primary ellipse into a series of points
2. For each point, run algorithm 1
3. After finding the point with the lowest distance,
search for the overall minimum.
"""

from typing import List, Optional, Union

import numpy as np
import numpy.typing as npt
import quivr as qv
import ray
from ray import ObjectRef
from scipy.optimize import minimize_scalar

from ..coordinates import (
    CartesianCoordinates,
    KeplerianCoordinates,
    Origin,
    OriginCodes,
)
from ..dynamics.propagation import _propagate_2body
from ..orbits import Orbits
from ..ray_cluster import initialize_use_ray
from ..time import Timestamp
from ..utils.iter import _iterate_chunks
from ..utils.spice import get_perturber_state


[docs] class PerturberMOIDs(qv.Table): orbit_id = qv.LargeStringColumn() perturber = Origin.as_column() moid = qv.Float64Column() time = Timestamp.as_column()
[docs] def project_point_on_plane( P0: npt.NDArray, plane_coordinates: CartesianCoordinates ) -> npt.NDArray: """ Take a point P0 (from the secondary ellipse) and project it onto the plane of the primary ellipse. """ assert len(plane_coordinates) == 1 n_hat = plane_coordinates.h[0] / plane_coordinates.h_mag[0] projected_point = P0 - np.dot(P0, n_hat) * n_hat return projected_point
[docs] def coplanar_distance_to_ellipse( P: npt.NDArray, keplerian_coordinates: KeplerianCoordinates, u: float ) -> float: """ Calculate the distance from point P on the plane to the ellipse. """ a, e, i, ap, raan, M = keplerian_coordinates.values[0] # Calculate the eccentric anomaly E = 2 * np.arctan(np.sqrt((1 - e) / (1 + e)) * np.tan(u / 2)) # Calculate the true anomaly v = 2 * np.arctan(np.sqrt((1 + e) / (1 - e)) * np.tan(u / 2)) # Calculate the radius r = a * (1 - e * np.cos(E)) # Calculate the position vector r_vec = np.array([r * np.cos(v), r * np.sin(v), 0]) # Calculate the distance from the point to the ellipse distance = np.linalg.norm(P - r_vec) return distance
[docs] def minimize_distance_from_coplanar_point_to_ellipse( P: npt.NDArray, keplerian_coordinates: KeplerianCoordinates ): """ Find the minimum distance to the ellipse from planar point P We only need the magnitude of the distance here to calculate distance from P0 to closest point on ellipse """ # Minimize the distance function with respect to an angle u result = minimize_scalar( lambda u: coplanar_distance_to_ellipse(P, keplerian_coordinates, u), bounds=(0, 2 * np.pi), method="bounded", tol=1e-12, ) return result.fun
[docs] def distance_from_point_to_ellipse( P0: npt.NDArray, P: npt.NDArray, d_parallel: float ) -> float: """ Takes the parallel and perpendicular distances from the point to the ellipse and calculates the total distance """ d_perp = np.linalg.norm(P - P0) distance = np.sqrt(d_perp**2 + d_parallel**2) return distance
[docs] def calculate_distance_from_point_to_ellipse(P0: npt.NDArray, primary_ellipse: Orbits): """ Calculate the distance from point P0 (from first ellipse) to the closest point on the ellipse """ # Project the point onto the plane of the other ellipse P = project_point_on_plane(P0, primary_ellipse.coordinates) # Find the minimum distance from the projected point to the ellipse d_parallel = minimize_distance_from_coplanar_point_to_ellipse( P, primary_ellipse.coordinates.to_keplerian() ) # Calculate the distance from P0 to the closest point on the ellipse distance = distance_from_point_to_ellipse(P0, P, d_parallel) return distance
[docs] def calculate_moid_for_dt( primary_ellipse: Orbits, secondary_ellipse: Orbits, dt: float ): # Propagate the primary ellipse by dt t_0 = primary_ellipse.coordinates.time.mjd()[0].as_py() primary_ellipse_propagated = _propagate_2body( primary_ellipse.coordinates.values[0], t_0, t_0 + dt, primary_ellipse.coordinates.origin.mu()[0], ) P0 = primary_ellipse_propagated[:3] distance_to_secondary_ellipse = calculate_distance_from_point_to_ellipse( P0, secondary_ellipse ) return distance_to_secondary_ellipse
# Now for Algorithm 1, where we discretize the primary ellipse and run Algorithm 1 for each point
[docs] def calculate_moid( primary_ellipse: Orbits, secondary_ellipse: Orbits ) -> tuple[float, Timestamp]: """ Calculate the Minimum Orbit Intersection Distance (MOID) between two orbits. """ keplerian = primary_ellipse.coordinates.to_keplerian() period = keplerian.P[0] e = keplerian.e[0].as_py() if e < 1: bounds = (0, period) else: bounds = (0, 10000) result = minimize_scalar( lambda dt: calculate_moid_for_dt(primary_ellipse, secondary_ellipse, dt), bounds=bounds, method="bounded", tol=1e-14, ) moid = result.fun moid_time = Timestamp.from_mjd( [primary_ellipse.coordinates.time.mjd()[0].as_py() + result.x], scale=primary_ellipse.coordinates.time.scale, ) if result.status != 0: raise ValueError("MOID calculation did not converge.") return moid, moid_time
[docs] def moid_worker( idx_chunk: npt.NDArray[np.int64], orbits: Orbits, perturber: OriginCodes ) -> PerturberMOIDs: """ Calculate the MOID for a chunk of orbits with respect to a perturbing body. """ orbits_chunk = orbits.take(idx_chunk) states = get_perturber_state( perturber, orbits_chunk.coordinates.time, frame=orbits_chunk.coordinates.frame, origin=orbits_chunk.coordinates.origin[0].as_OriginCodes(), ) moids = PerturberMOIDs.empty() for orbit, state in zip(orbits_chunk, states): moid, moid_time = calculate_moid( orbit, Orbits.from_kwargs(orbit_id=[perturber.name], coordinates=state) ) moids_i = PerturberMOIDs.from_kwargs( orbit_id=orbit.orbit_id, perturber=Origin.from_kwargs(code=[perturber.name]), moid=[moid], time=moid_time, ) moids = qv.concatenate([moids, moids_i]) return moids
moid_worker_ray = ray.remote(moid_worker)
[docs] def calculate_perturber_moids( orbits: Orbits, perturber: Union[OriginCodes, List[OriginCodes]], chunk_size: int = 100, max_processes: Optional[int] = 1, ) -> PerturberMOIDs: """ Calculate the minimum orbit intersection distance (MOID) for all orbits with respect to the perturbing body or bodies. Parameters ---------- orbits : Orbits The orbits to calculate the MOID for. perturber : OriginCodes or List[OriginCodes] The perturbing body or bodies to calculate the MOID for. chunk_size : int, optional The number of orbits to process in each chunk, by default 100. max_processes : int, optional The maximum number of processes to use, by default 1. Returns ------- PerturberMOIDs A table containing the MOID and time for each orbit with respect to the perturbing body or bodies. """ assert len(orbits.orbit_id.unique()) == len(orbits) assert len(orbits.coordinates.origin.code.unique()) == 1 if isinstance(perturber, OriginCodes): perturbers = [perturber] else: perturbers = perturber moids = PerturberMOIDs.empty() use_ray = initialize_use_ray(num_cpus=max_processes) if use_ray: if not isinstance(orbits, ObjectRef): orbits_ref = ray.put(orbits) else: orbits_ref = orbits orbits = ray.get(orbits_ref) futures_inputs = [] idx = np.arange(0, len(orbits)) for perturber in perturbers: for idx_chunk in _iterate_chunks(idx, chunk_size): futures_inputs.append( ( idx_chunk, orbits_ref, perturber, ) ) futures = [] for future_input in futures_inputs: futures.append(moid_worker_ray.remote(*future_input)) if len(futures) >= max_processes * 1.5: finished, futures = ray.wait(futures, num_returns=1) result = ray.get(finished[0]) moids = qv.concatenate([moids, result]) while futures: finished, futures = ray.wait(futures, num_returns=1) result = ray.get(finished[0]) moids = qv.concatenate([moids, result]) else: idx = np.arange(0, len(orbits)) for perturber in perturbers: for idx_chunk in _iterate_chunks(idx, chunk_size): moids_i = moid_worker(idx_chunk, orbits, perturber) moids = qv.concatenate([moids, moids_i]) return moids