import warnings
from typing import List, Tuple, Union
import numpy as np
import numpy.typing as npt
import pyarrow as pa
import quivr as qv
from scipy import stats
from .cartesian import CartesianCoordinates
from .cometary import CometaryCoordinates
from .keplerian import KeplerianCoordinates
from .spherical import SphericalCoordinates
CoordinateType = Union[
CartesianCoordinates,
CometaryCoordinates,
KeplerianCoordinates,
SphericalCoordinates,
]
SUPPORTED_COORDINATES = (
CartesianCoordinates,
CometaryCoordinates,
KeplerianCoordinates,
SphericalCoordinates,
)
__all__ = [
"Residuals",
"calculate_chi2",
"_batch_coords_and_covariances",
]
def apply_cosine_latitude_correction(
lat: npt.NDArray[np.float64],
residuals: npt.NDArray[np.float64],
covariances: npt.NDArray[np.float64],
) -> Tuple[npt.NDArray[np.float64], npt.NDArray[np.float64]]:
"""
Apply a correction factor of cosine latitude to the residuals and covariance matrix.
This is designed to account for the fact that longitudes get smaller as you approach
the poles.
Parameters
----------
lat : `~numpy.ndarray` (N)
Latitudes in degrees.
residuals : `~numpy.ndarray` (N, D)
Spherical residuals.
covariances : `~numpy.ndarray` (N, D, D)
Covariance matrices for spherical residuals.
Returns
-------
residuals : `~numpy.ndarray` (N, D)
Spherical residuals with the correction factor applied.
covariances : `~numpy.ndarray` (N, D, D)
Covariance matrices for spherical residuals with the correction factor applied.
"""
N = len(lat)
cos_lat = np.cos(np.radians(lat))
identity = np.identity(6, dtype=np.float64)
# Populate the diagonal of the matrix with cos(latitude) for
# the longitude and longitudinal velocity dimensions
diag = np.ones((N, 6))
diag[:, 1] = cos_lat
diag[:, 4] = cos_lat
# Calculate the cos(latitude) factor for the covariance matrix
cos_lat_cov = np.einsum("kj,ji->kij", diag, identity, order="C")
# Apply the cos(latitude) factor to the residuals in longitude
# and longitudinal velocity
residuals[:, 1] *= cos_lat
residuals[:, 4] *= cos_lat
# Identify locations where the covariance matrix is NaN and set
# those values to 0.0
nan_cov = np.isnan(covariances)
covariances_masked = np.where(nan_cov, 0.0, covariances)
# Apply the cos(latitude) factor to the covariance matrix.
covariances_masked = (
cos_lat_cov @ covariances_masked @ cos_lat_cov.transpose(0, 2, 1)
)
# Set the covariance matrix to nan where it was nan before.
# Note that when we apply cos(latitude) to the covariance matrix in the previous statement,
# we are only modifying the longitude and longitudinal velocity dimensions. The remaining
# dimensions are left unchanged which is why we can use a mask to reset the
# the missing values in the resulting matrix to NaN.
covariances = np.where(nan_cov, np.nan, covariances_masked)
return residuals, covariances
def bound_longitude_residuals(
observed: npt.NDArray[np.float64], residuals: npt.NDArray[np.float64]
) -> npt.NDArray[np.float64]:
"""
Bound the longitude residuals to the range [-180, 180] degrees and adjust the
signs of the residuals that cross the 0/360 degree boundary. By convention, we define
residuals that cross from <360 to >0 as a positive residual (355 - 5 = +10) and cross
from >0 to <360 as a negative residual (5 - 355 = -10).
Parameters
----------
observed : `~numpy.ndarray` (N, D)
Observed coordinates in degrees. We use the observed coordinates to
determine whether the residuals cross the 0/360 degree boundary.
residuals : `~numpy.ndarray` (N, D)
Residuals in degrees.
Returns
-------
residuals : `~numpy.ndarray` (N, D)
Residuals wrapped to the range [-180, 180] degrees.
"""
# Extract the observed longitude and residuals
observed_longitude = observed[:, 1]
longitude_residual = residuals[:, 1]
# Identify residuals that exceed a half circle
longitude_residual_g180 = longitude_residual > 180
longitude_residual_l180 = longitude_residual < -180
# Wrap the residuals to the range [-180, 180] degrees
longitude_residual = np.where(
longitude_residual_g180, longitude_residual - 360, longitude_residual
)
longitude_residual = np.where(
longitude_residual_l180, longitude_residual + 360, longitude_residual
)
# Adjust the signs of the residuals that cross the 0/360 degree boundary
# We want it so that crossing from <360 to >0 is a positive residual (355 - 5 = +10)
# and crossing from >0 to <360 is a negative residual (5 - 355 = -10)
longitude_residual = np.where(
longitude_residual_g180 & (observed_longitude > 180),
-longitude_residual,
longitude_residual,
)
longitude_residual = np.where(
longitude_residual_l180 & (observed_longitude < 180),
-longitude_residual,
longitude_residual,
)
residuals[:, 1] = longitude_residual
return residuals
[docs]
class Residuals(qv.Table):
values = qv.ListColumn(pa.float64(), nullable=True)
chi2 = qv.Float64Column(nullable=True)
dof = qv.Int64Column(nullable=True)
probability = qv.Float64Column(nullable=True)
[docs]
@classmethod
def calculate(
cls,
observed: CoordinateType,
predicted: CoordinateType,
custom_coordinates: bool = False,
use_predicted_covariance: bool = True,
) -> "Residuals":
"""
Calculate the residuals between the observed and predicted coordinates. Residuals
are defined as the observed coordinates minus the predicted coordinates.
The covariance used for chi2 is the sum of the observed covariance and (optionally)
the predicted covariance. If a single predicted coordinate is provided, it is
broadcast to the same length as the observed coordinates.
Parameters
----------
observed : CoordinateType (N, D)
Observed coordinates.
predicted : CoordinateType (N, D) or (1, D)
Predicted coordinates. If a single coordinate is provided, it is broadcasted
to the same shape as the observed coordinates.
custom_coordinates : bool, optional
If True, the coordinate class will not be checked against the supported coordinate classes
included in adam_core.
use_predicted_covariance : bool, optional
If True, include predicted covariance in the chi2 calculation (i.e., use Σ_obs + Σ_pred).
If False, use only the observed covariance (Σ_obs).
Returns
-------
residuals : `~adam_core.coordinates.residuals.Residuals`
Residuals between the observed and predicted coordinates.
"""
if not custom_coordinates:
if not isinstance(observed, SUPPORTED_COORDINATES):
raise TypeError(
f"Observed coordinates must be one of {SUPPORTED_COORDINATES}, not {type(observed)}."
)
if not isinstance(predicted, SUPPORTED_COORDINATES):
raise TypeError(
f"Predicted coordinates must be one of {SUPPORTED_COORDINATES}, not {type(predicted)}."
)
if type(observed) is not type(predicted):
raise TypeError(
"Observed and predicted coordinates must be the same type, "
f"not {type(observed)} and {type(predicted)}."
)
if (observed.origin != predicted.origin).all():
raise ValueError(
"Observed and predicted coordinates must have the same origin."
)
if observed.frame != predicted.frame:
raise ValueError(
f"Observed ({observed.frame}) and predicted ({predicted.frame}) coordinates must have the same frame."
)
# Extract the observed and predicted values
observed_values = observed.values
observed_covariances = observed.covariance.to_matrix()
predicted_values = predicted.values
# Broadcast predicted values to match observed length if needed
N, D = observed_values.shape
if len(predicted_values) == 1:
predicted_values = np.broadcast_to(predicted_values, (N, D))
elif len(predicted_values) != N:
raise ValueError(
f"Predicted coordinates must have length 1 or match observed length ({N}), got {len(predicted_values)}."
)
# Predicted covariances (optional)
if use_predicted_covariance:
try:
predicted_covariances = predicted.covariance.to_matrix()
except Exception:
predicted_covariances = np.full(
(
len(predicted),
observed_covariances.shape[1],
observed_covariances.shape[2],
),
np.nan,
)
# Broadcast to N if needed
if predicted_covariances.shape[0] == 1 and N > 1:
predicted_covariances = np.broadcast_to(
predicted_covariances, observed_covariances.shape
)
elif predicted_covariances.shape[0] != N:
if len(predicted) == 1:
predicted_covariances = np.broadcast_to(
predicted_covariances, observed_covariances.shape
)
else:
raise ValueError(
f"Predicted covariance length must be 1 or {N}, got {predicted_covariances.shape[0]}"
)
# Replace NaNs in predicted covariance with 0 so missing entries contribute nothing
predicted_covariances = np.where(
np.isnan(predicted_covariances), 0.0, predicted_covariances
)
else:
predicted_covariances = np.zeros_like(observed_covariances)
# Create the output arrays
p = np.empty(N, dtype=np.float64)
chi2s = np.empty(N, dtype=np.float64)
# Calculate the degrees of freedom for every coordinate
dof = D - np.sum(np.isnan(observed_values), axis=1)
# Calculate the array of residuals
residuals = observed_values - predicted_values
# If the coordinates are spherical then we do some extra work:
# 1. Bound residuals in longitude to the range [-180, 180] degrees and
# adjust the signs of the residuals that cross the 0/360 degree boundary.
# 2. Apply the cos(latitude) factor to the residuals in longitude and longitudinal
# velocity. Apply the same scaling to covariances.
if isinstance(observed, SphericalCoordinates):
# Bound the longitude residuals to the range [-180, 180] degrees
residuals = bound_longitude_residuals(observed_values, residuals)
# Apply the cos(latitude) factor to the residuals and observed covariance
residuals, observed_covariances = apply_cosine_latitude_correction(
observed_values[:, 2], residuals, observed_covariances
)
# Apply the same scaling to the predicted covariances (without touching residuals again)
_, predicted_covariances = apply_cosine_latitude_correction(
observed_values[:, 2], np.zeros_like(residuals), predicted_covariances
)
# Total covariance = observed + predicted (predicted may be zeros if disabled)
total_covariances = observed_covariances + predicted_covariances
# Batch the coordinates and covariances into groups that have the same
# number of dimensions that have missing values (represented by NaNs)
(
batch_indices,
batch_dimensions,
batch_coords,
batch_covariances,
) = _batch_coords_and_covariances(observed_values, total_covariances)
for indices, dimensions, coords, covariances in zip(
batch_indices, batch_dimensions, batch_coords, batch_covariances
):
if not np.all(np.isnan(covariances)):
# Filter residuals by dimensions that have values
residuals_i = residuals[:, dimensions]
# Then filter by rows that belong to this batch
residuals_i = residuals_i[indices, :]
# Calculate the chi2 for each coordinate (this is actually
# calculating mahalanobis distance squared -- both are equivalent
# when the covariance matrix is diagonal, mahalanobis distance is more
# general as it allows for covariates between dimensions)
chi2_values = calculate_chi2(residuals_i, covariances)
# For a normally distributed random variable, the mahalanobis distance
# squared in D dimesions follows a chi2 distribution with D degrees of freedom.
# So for each coordinate, calculate the probability that you would
# get a chi2 value greater than or equal to that coordinate's chi2 value.
# At a residual of zero this probability is 1.0, and at a residual of
# 1 sigma (for 1 degree of freedom) this probability is ~0.3173.
p[indices] = 1 - stats.chi2.cdf(chi2_values, dof[indices])
# Set the chi2 for each coordinate
chi2s[indices] = chi2_values
else:
# If the covariance matrix is all NaNs, then the chi2 is NaN
chi2s[indices] = np.nan
p[indices] = np.nan
return cls.from_kwargs(
values=residuals.tolist(),
chi2=chi2s,
dof=dof,
probability=p,
)
[docs]
def to_array(self) -> npt.NDArray[np.float64]:
"""
Convert the residuals to a numpy array.
Returns
-------
residuals : `~numpy.ndarray` (N, D)
Array of residuals.
"""
return np.stack(self.values.to_numpy(zero_copy_only=False))
[docs]
def calculate_chi2(
residuals: npt.NDArray[np.float64], covariances: npt.NDArray[np.float64]
) -> npt.NDArray[np.float64]:
"""
Calculate the vectorized normalized squared residual for each residual and covariance pair.
This normalized residual is equivalent to the Mahalanobis distance squared.
For residuals with no covariance (all non-diagonal covariance elements are zero) this
is exactly equivalent to chi2.
If the off-diagonal covariance elements (the covariate terms) are missing (represented by NaNs),
then they will assumed to be 0.0.
Parameters
----------
residuals : `~numpy.ndarray` (N, D)
Array of N residuals in D dimensions (observed - predicted).
covariances : `~numpy.ndarray` (N, D, D)
Array of N covariance matrices in D dimensions.
Returns
-------
chi2 : `~numpy.ndarray` (N)
Array of chi2 values for each residual and covariance pair.
References
----------
[1] Carpino, M. et al. (2003). Error statistics of asteroid optical astrometric observations.
Icarus, 166, 248-270. https://doi.org/10.1016/S0019-1035(03)00051-4
"""
# Raise error if any of the diagonal elements are nan
if np.any(np.isnan(np.diagonal(covariances, axis1=1, axis2=2))):
raise ValueError("Covariance matrix has NaNs on the diagonal.")
# Warn if any of the non-diagonal elements are nan
if np.any(np.isnan(covariances)):
warnings.warn(
"Covariance matrix has NaNs on the off-diagonal (these will be assumed to be 0.0).",
UserWarning,
)
covariances = np.where(np.isnan(covariances), 0.0, covariances)
W = np.linalg.inv(covariances)
chi2 = np.einsum("ij,ji->i", np.einsum("ij,ijk->ik", residuals, W), residuals.T)
return chi2
def calculate_reduced_chi2(residuals: Residuals, parameters: int) -> float:
"""
Calculate the reduced chi2 for a set of residuals.
Parameters
----------
residuals : `~adam_core.coordinates.residuals.Residuals`
Residuals.
parameters : int
Number of parameters in the model.
Returns
-------
reduced_chi2 : float
Reduced chi2.
"""
chi2_total = residuals.chi2.to_numpy().sum()
dof_total = residuals.dof.to_numpy().sum() - parameters
return chi2_total / dof_total
def _batch_coords_and_covariances(
coords: npt.NDArray[np.float64], covariances: npt.NDArray[np.float64]
) -> Tuple[
List[npt.NDArray[np.float64]],
List[npt.NDArray[np.float64]],
List[npt.NDArray[np.float64]],
List[npt.NDArray[np.float64]],
]:
"""
Batch coordinates and covariances into groups that have the same
number of dimensions that have missing values (represented by NaNs).
Parameters
----------
coords : `~numpy.ndarray` (N, D)
Array of N coordinates in D dimensions.
covariances : `~numpy.ndarray` (N, D, D)
Array of N covariance matrices in D dimensions.
Returns
-------
batch_indices : List[`~numpy.ndarray` (<=N)]
List of arrays of indices into the original arrays that correspond to
coordinates with the same number of dimensions that have missing values.
batch_dimensions : List[`~numpy.ndarray` (<=D)]
List of arrays of dimensions that have values for each batch.
batch_coords : List[`~numpy.ndarray` (<=N, <=D)]
Array of N coordinates in D dimensions with missing dimensions removed.
batch_covariances : List[`~numpy.ndarray` (<=N, <=D, <=D)]
Array of N covariance matrices in D dimensions with missing dimensions removed.
"""
N, D = coords.shape
assert covariances.shape == (N, D, D)
# Find the indices of the coordinates that have the same missing dimensions
# (if any) and select the coordinates and covariances for those indices
nans = np.isnan(coords)
batch_masks = np.unique(nans, axis=0)
indices = np.arange(0, N, dtype=np.int64)
batch_indices: List[np.ndarray] = []
batch_dimensions: List[np.ndarray] = []
batch_coords: List[np.ndarray] = []
batch_covariances: List[np.ndarray] = []
for batch in batch_masks:
# Find the indices of the coordinates that have the same missing dimensions
# and select the coordinates and covariances for those indices
batch_mask_i = indices[np.where(np.all(nans == batch, axis=1))[0]]
coords_i = coords[batch_mask_i]
covariances_i = covariances[batch_mask_i]
# Remove the missing dimensions from the coordinates and covariances
dimensions_with_values = np.where(~batch)[0]
coords_i = coords_i[:, dimensions_with_values]
covariances_i = covariances_i[:, dimensions_with_values, :]
covariances_i = covariances_i[:, :, dimensions_with_values]
batch_indices.append(batch_mask_i)
batch_dimensions.append(dimensions_with_values)
batch_coords.append(coords_i)
batch_covariances.append(covariances_i)
return batch_indices, batch_dimensions, batch_coords, batch_covariances