Source code for adam_core.orbits.query.neocc

from typing import Any, Dict, List, Literal, Union

import numpy as np
import numpy.typing as npt
import quivr as qv
import requests

from adam_core.coordinates.covariances import _upper_triangular_to_full

from ...coordinates import CoordinateCovariances, KeplerianCoordinates, Origin
from ...orbits import Orbits
from ...time import Timestamp
from ..physical_parameters import PhysicalParameters


def _remove_last_column_of_upper_triangular(
    matrix: npt.NDArray[np.float64],
) -> npt.NDArray[np.float64]:
    """
    Drop the last column of the 28-element upper triangular matrix.

    The dropped column likely corresponds to magnitude. For example, see "2018 CW2".

    Parameters:
    -----------
    matrix: npt.NDArray[np.float64]
        28-element upper triangular matrix
    Returns:
    --------
    21-element upper triangular matrix.
    """
    assert len(matrix) == 28
    full = np.zeros((7, 7))
    full[np.triu_indices(7)] = matrix
    full = full[:6, :6]  # just drop the last one
    return full[np.triu_indices(6)]


def _parse_oef(data: str) -> Dict[str, Any]:
    """
    Parse a OEF file and return the stored orbital elements.

    Parameters
    ----------
    data: str
        The content of the OEF file.

    Returns
    -------
    Dict[str, Any]
        Dictionary containing the parsed orbital elements and metadata.


    Examples
    --------
    format  = 'OEF2.0'       ! file format
    rectype = 'ML'           ! record type (1L/ML)
    refsys  = ECLM J2000     ! default reference system
    END_OF_HEADER
    2024YR4
    ! Keplerian elements: a, e, i, long. node, arg. peric., mean anomaly
     KEP   2.5158127507489616E+00  6.6154036821914619E-01   3.4081393687180 271.3655954496424 134.3614240204325  4.0403920526717883E+01
     MJD     60800.000000000 TDT
     MAG  23.876  0.150
    ! Non-gravitational parameters: model used, number of model parameters, dimension
     LSP   0  0    6
    ! PERIHELION   8.5150105724807035E-01
    ! APHELION   4.1801244442498522E+00
    ! ANODE   1.6132920678553648E+00
    ! DNODE  -1.6139338737144644E-02
    ! MOID   2.8281976977061222E-03
    ! PERIOD   1.4575246142278593E+03
    ! PHA F
    ! VINFTY    14.2161102117779
    ! U_PAR 5.5
    ! ORB_TYPE Apollo
    ! RMS    1.45945E-04   2.08511E-05   7.80533E-05   1.08159E-05   9.07220E-05   3.57225E-03
     COV   2.129990103626278E-08  3.043103695236090E-09  1.138994073085263E-08
     COV  -1.297300567885438E-09 -1.321094812357632E-08 -5.213516734886736E-07
     COV   4.347664336862408E-10  1.627274457493249E-09 -1.853526029216412E-10
     COV  -1.887473042074563E-09 -7.448518590163292E-08  6.092321667952164E-09
     COV  -6.879908552449990E-10 -7.069797588501348E-09 -2.787885242390572E-07
     COV   1.169847013231039E-10  7.781995171805923E-10  3.175091792990313E-08
     COV   8.230474520730192E-09  3.233601008844550E-07  1.276097850815426E-05
     COR   1.000000000000000E+00  9.999998967955239E-01  9.998647520507938E-01
     COR  -8.218400094356861E-01 -9.977753189147101E-01 -9.999999781092901E-01
     COR   9.999999999999999E-01  9.998650578225570E-01 -8.218757198089662E-01
     COR  -9.977926872410703E-01 -9.999998019539900E-01  9.999999999999998E-01
     COR  -8.149420314930980E-01 -9.983966836512449E-01 -9.998652884032685E-01
     COR   1.000000000000000E+00  7.930744967922404E-01  8.217690139449345E-01
     COR   1.000000000000000E+00  9.977735927640640E-01  1.000000000000000E+00

    """
    lines = data.strip().split("\n")
    result = {}

    # Parse header
    header = {}
    for line in lines:
        line = line.strip()
        if line == "END_OF_HEADER":
            break
        if "=" in line:
            key, value = line.split("=", 1)
            header[key.strip()] = value.split("!")[0].strip().strip("'")
    result["header"] = header

    # Find object ID
    for line in lines:
        if not line.startswith(
            ("!", " ", "format", "rectype", "refsys", "END_OF_HEADER")
        ):
            result["object_id"] = line.strip()
            break

    # Parse Keplerian elements
    for line in lines:
        if line.strip().startswith("KEP"):
            elements = line.split()[1:]
            result["elements"] = {
                "a": float(elements[0]),  # semi-major axis
                "e": float(elements[1]),  # eccentricity
                "i": float(elements[2]),  # inclination
                "node": float(elements[3]),  # longitude of ascending node
                "peri": float(elements[4]),  # argument of perihelion
                "M": float(elements[5]),  # mean anomaly
            }

    # Parse epoch
    for line in lines:
        if line.strip().startswith("MJD"):
            result["epoch"] = float(line.split()[1])
            result["time_system"] = line.split()[2]

    # Parse magnitude: OEF MAG line is (absolute magnitude H, slope parameter G), V-band.
    # Ref: ESA NEOCC Automated Data Access, https://neo.ssa.esa.int/computer-access
    for line in lines:
        if line.strip().startswith("MAG"):
            mag_data = line.split()[1:]
            result["magnitude"] = {
                "H": float(mag_data[0]),
                "G": float(mag_data[1]),
            }

    # Parse derived parameters (marked with !)
    derived = {}
    for line in lines:
        if line.strip().startswith("!") and len(line.split()) >= 3:
            key = line.split()[1].lower()
            try:
                value = float(line.split()[2])
                derived[key] = value
            except ValueError:
                derived[key] = line.split()[2]
    result["derived"] = derived

    # Parse covariance matrix
    cov_matrix = []
    for line in lines:
        if line.strip().startswith("COV"):
            cov_matrix.extend([float(x) for x in line.split()[1:]])
    if cov_matrix:
        # Upper triangular matrix with order
        # (1,1)   (1,2)   (1,3)
        # (1,4)   (1,5)   (1,6)
        # (2,2)   (2,3)   (2,4)
        # (2,5)   (2,6)   (3,3)
        # (3,4)   (3,5)   (3,6)
        # (4,4)   (4,5)   (4,6)
        # (5,5)   (5,6)   (6,6)
        if len(cov_matrix) == 28:
            cov_matrix = _remove_last_column_of_upper_triangular(cov_matrix)
        result["covariance"] = _upper_triangular_to_full(np.array(cov_matrix))

    # Parse correlation matrix
    cor_matrix = []
    for line in lines:
        if line.strip().startswith("COR"):
            cor_matrix.extend([float(x) for x in line.split()[1:]])
    if cor_matrix:
        if len(cor_matrix) == 28:
            cor_matrix = _remove_last_column_of_upper_triangular(cor_matrix)
        result["correlation"] = _upper_triangular_to_full(np.array(cor_matrix))

    return result


def _physical_parameters_from_neocc(data: Dict[str, Any]) -> PhysicalParameters:
    """
    Build one-row PhysicalParameters from parsed NEOCC OEF data.

    OEF MAG line is (H, G); no uncertainties in NEOCC OEF. V-band per ESA doc.
    Ref: https://neo.ssa.esa.int/computer-access
    """
    mag = data.get("magnitude")
    if mag is not None and "H" in mag:
        h = float(mag["H"])
        g = float(mag["G"]) if mag.get("G") is not None else np.nan
        return PhysicalParameters.from_kwargs(
            H_v=[h],
            H_v_sigma=[np.nan],
            G=[g],
            G_sigma=[np.nan],
        )
    return PhysicalParameters.from_kwargs(
        H_v=[np.nan],
        H_v_sigma=[np.nan],
        G=[np.nan],
        G_sigma=[np.nan],
    )


[docs] def query_neocc( object_ids: Union[List, npt.ArrayLike], orbit_type: Literal["ke", "eq"] = "ke", orbit_epoch: Literal["middle", "present-day"] = "present-day", ) -> Orbits: """ Query ESA's Near-Earth Object Coordination Centre (NEOCC) database for orbital elements of the specified NEOs. Parameters ---------- object_ids : Union[List, npt.ArrayLike] Object IDs / designations recognizable by NEOCC. orbit_type : ["ke", "eq"] Type of orbital elements to query. orbit_epoch : ["middle", "present-day"] Epoch of the orbital elements to query. Returns ------- orbits : `~adam_core.orbits.Orbits` Orbits object containing the orbital elements of the specified NEOs. """ base_url = "https://neo.ssa.esa.int/PSDB-portlet/download" if orbit_type == "eq": raise NotImplementedError("Equinoctial elements are not supported yet.") if orbit_epoch == "middle": orbit_epoch = 0 elif orbit_epoch == "present-day": orbit_epoch = 1 else: raise ValueError(f"Invalid orbit epoch: {orbit_epoch}") orbits = Orbits.empty() for object_id in object_ids: # Clean object ID so that there are no spaces object_id = object_id.replace(" ", "") params = {"file": f"{object_id}.{orbit_type}{orbit_epoch}"} response = requests.get(base_url, params=params) response.raise_for_status() data = _parse_oef(response.text) if orbit_type == "ke" and "time_system" in data: time_scale = data["time_system"] if time_scale == "TDT": time_scale = "tt" else: raise ValueError(f"Unsupported time scale: {time_scale}") if data["header"]["refsys"] != "ECLM J2000": raise ValueError( f"Unsupported reference system: {data['header']['refsys']}" ) phys = _physical_parameters_from_neocc(data) orbit = Orbits.from_kwargs( orbit_id=[data["object_id"]], object_id=[data["object_id"]], coordinates=KeplerianCoordinates.from_kwargs( a=[data["elements"]["a"]], e=[data["elements"]["e"]], i=[data["elements"]["i"]], raan=[data["elements"]["node"]], ap=[data["elements"]["peri"]], M=[data["elements"]["M"]], time=Timestamp.from_mjd([data["epoch"]], scale=time_scale), covariance=CoordinateCovariances.from_matrix( data["covariance"].reshape( 1, 6, 6, ) ), frame="ecliptic", origin=Origin.from_kwargs(code=["SUN"]), ).to_cartesian(), physical_parameters=phys, ) orbits = qv.concatenate([orbits, orbit]) return orbits