Source code for adam_core.photometry.rotation.core

from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

import numpy as np
import numpy.typing as npt
import pyarrow as pa
import quivr as qv
from scipy.stats import f as f_dist  # type: ignore[import-untyped]

from ...constants import Constants
from ...time import Timestamp
from ..magnitude_common import hg_phase_correction

if TYPE_CHECKING:
    from ...coordinates.cartesian import CartesianCoordinates
    from ...observations.detections import PointSourceDetections
    from ...observations.exposures import Exposures

# Speed of light in au/day. Use the canonical adam_core constant rather than a local
# literal to avoid drift.
LIGHT_SPEED_AU_PER_DAY = Constants.C
_HG_G_MIN = -0.25
_HG_G_MAX = 0.95
_HG_PHASE_MIN_DEG = 1.0
_HG_PHASE_MAX_DEG = 120.0
_HG_ARC_HALF_WIDTH_DEG = 0.5
_PRIOR_SIGMA_FLOOR = 1.0e-6


@dataclass(slots=True)
class _FitResult:
    frequency: float
    fourier_order: int
    coeffs: npt.NDArray[np.float64]
    residual_sigma: float
    rss: float
    df: int
    n_par: int
    n_fit: int
    n_clipped: int
    phase_c1_idx: int
    phase_c2_idx: int


@dataclass(slots=True)
class _DesignInfo:
    fixed: npt.NDArray[np.float64]
    n_filters: int
    phase_c1_idx: int
    phase_c2_idx: int


@dataclass(slots=True)
class _SessionSummary:
    n_sessions: int
    min_group_count: int
    median_session_span_days: float


@dataclass(slots=True)
class _FitWithPeriod:
    fit: _FitResult
    period_days: float
    period_hours: float
    is_period_doubled: bool


@dataclass(frozen=True, slots=True)
class _FourierProfile:
    name: str
    orders: tuple[int, ...]
    order_selection_confidence: float
    sigma_threshold_confidence: float
    valid_relative_uncertainty_max: float | None
    reliable_relative_multiplier: float | None
    reliable_absolute_hours: float | None


# Solver profile, built off the Greenstreet (2026) rotation-period method:
# search Fourier orders 2-6, pick the order by F-test at 90% confidence, cluster aliases
# at the 95% sigma threshold, and call a period "reliable" when its uncertainty is within
# max(2P, 7h).
PROFILES: dict[str, _FourierProfile] = {
    "default": _FourierProfile(
        name="default",
        orders=(2, 3, 4, 5, 6),
        order_selection_confidence=0.90,
        sigma_threshold_confidence=0.95,
        valid_relative_uncertainty_max=None,
        reliable_relative_multiplier=2.0,
        reliable_absolute_hours=7.0,
    ),
}


def _to_numpy_1d(
    values: pa.Array | pa.ChunkedArray | npt.ArrayLike,
) -> npt.NDArray[Any]:
    if isinstance(values, (pa.Array, pa.ChunkedArray)):
        return np.asarray(values.to_numpy(zero_copy_only=False))
    return np.asarray(values)


def _ordered_unique(values: npt.NDArray[np.object_]) -> list[str]:
    seen: set[str] = set()
    ordered: list[str] = []
    for value in values.tolist():
        label = "__missing__" if value is None else str(value)
        if label not in seen:
            seen.add(label)
            ordered.append(label)
    return ordered


def _normalized_string_labels(
    values: npt.NDArray[np.object_],
    *,
    missing_label: str,
) -> npt.NDArray[np.object_]:
    return np.asarray(
        [missing_label if value is None else str(value) for value in values.tolist()],
        dtype=object,
    )


def _validate_inputs(
    observations: RotationPeriodObservations,
) -> tuple[
    npt.NDArray[np.float64],
    npt.NDArray[np.float64],
    npt.NDArray[np.float64],
    npt.NDArray[np.float64],
    npt.NDArray[np.float64],
    npt.NDArray[np.object_],
    npt.NDArray[np.object_] | None,
    npt.NDArray[np.float64] | None,
]:
    n = len(observations)
    if n == 0:
        raise ValueError("observations must be non-empty")

    time = np.asarray(
        observations.time.rescale("tdb").mjd().to_numpy(False), dtype=np.float64
    )
    mag = np.asarray(observations.mag.to_numpy(zero_copy_only=False), dtype=np.float64)
    r_au = np.asarray(
        observations.r_au.to_numpy(zero_copy_only=False), dtype=np.float64
    )
    delta_au = np.asarray(
        observations.delta_au.to_numpy(zero_copy_only=False), dtype=np.float64
    )
    phase_angle = np.asarray(
        observations.phase_angle_deg.to_numpy(zero_copy_only=False), dtype=np.float64
    )

    if any(arr.shape != (n,) for arr in (time, mag, r_au, delta_au, phase_angle)):
        raise ValueError("all observation columns must be 1D and aligned")
    if not (
        np.all(np.isfinite(time))
        and np.all(np.isfinite(mag))
        and np.all(np.isfinite(r_au))
        and np.all(np.isfinite(delta_au))
        and np.all(np.isfinite(phase_angle))
    ):
        raise ValueError(
            "observations must contain finite time, mag, r_au, delta_au, and phase_angle_deg"
        )
    if np.any(r_au <= 0.0) or np.any(delta_au <= 0.0):
        raise ValueError("r_au and delta_au must be positive")

    filter_values = _to_numpy_1d(observations.filter)
    if filter_values.shape != (n,):
        raise ValueError("filter column must be 1D and aligned with observations")
    filter_labels = _normalized_string_labels(
        np.asarray(filter_values, dtype=object),
        missing_label="__missing_filter__",
    )

    session_labels = None
    session_values = _to_numpy_1d(observations.session_id)
    if session_values.shape != (n,):
        raise ValueError("session_id column must be 1D and aligned with observations")
    session_labels_raw = np.asarray(session_values, dtype=object)
    if np.any(
        np.asarray(
            [value is not None for value in session_labels_raw.tolist()], dtype=bool
        )
    ):
        session_labels = np.asarray(
            [
                None if value is None else str(value)
                for value in session_labels_raw.tolist()
            ],
            dtype=object,
        )

    mag_sigma = None
    raw_sigma = np.asarray(
        observations.mag_sigma.to_numpy(zero_copy_only=False), dtype=np.float64
    )
    if raw_sigma.shape != (n,):
        raise ValueError("mag_sigma column must be 1D and aligned with observations")
    if np.any(np.isfinite(raw_sigma)):
        mag_sigma = raw_sigma

    return (
        time,
        mag,
        r_au,
        delta_au,
        phase_angle,
        filter_labels,
        session_labels,
        mag_sigma,
    )


def _apply_light_time_correction(
    time_mjd_tdb: npt.NDArray[np.float64],
    delta_au: npt.NDArray[np.float64],
) -> npt.NDArray[np.float64]:
    corrected = np.asarray(time_mjd_tdb, dtype=np.float64) - (
        np.asarray(delta_au, dtype=np.float64) / LIGHT_SPEED_AU_PER_DAY
    )
    return np.asarray(corrected, dtype=np.float64)


def _build_fixed_design(
    filter_labels: npt.NDArray[np.object_],
    session_labels: npt.NDArray[np.object_] | None,
    phase_angle: npt.NDArray[np.float64],
) -> _DesignInfo:
    unique_filters = _ordered_unique(filter_labels)
    n = len(filter_labels)
    cols: list[npt.NDArray[np.float64]] = [np.ones(n, dtype=np.float64)]
    for label in unique_filters[1:]:
        cols.append((filter_labels == label).astype(np.float64))

    if session_labels is not None:
        normalized_sessions = _normalized_string_labels(
            np.asarray(session_labels, dtype=object),
            missing_label="__missing_session__",
        )
        for filter_label in unique_filters:
            filter_mask = filter_labels == filter_label
            ordered_sessions = _ordered_unique(normalized_sessions[filter_mask])
            for session_label in ordered_sessions[1:]:
                cols.append(
                    np.logical_and(
                        filter_mask, normalized_sessions == session_label
                    ).astype(np.float64)
                )

    phase_angle = np.asarray(phase_angle, dtype=np.float64)
    cols.append(phase_angle)
    cols.append(np.square(phase_angle))
    fixed = np.column_stack(cols).astype(np.float64, copy=False)
    return _DesignInfo(
        fixed=fixed,
        n_filters=len(unique_filters),
        phase_c1_idx=fixed.shape[1] - 2,
        phase_c2_idx=fixed.shape[1] - 1,
    )


def _summarize_sessions(
    time: npt.NDArray[np.float64],
    filter_labels: npt.NDArray[np.object_],
    session_labels: npt.NDArray[np.object_] | None,
) -> _SessionSummary:
    if session_labels is None:
        return _SessionSummary(
            n_sessions=0,
            min_group_count=0,
            median_session_span_days=0.0,
        )

    session_labels_norm = _normalized_string_labels(
        np.asarray(session_labels, dtype=object),
        missing_label="__missing_session__",
    )
    unique_sessions = _ordered_unique(session_labels_norm)
    unique_filters = _ordered_unique(filter_labels)
    group_counts: list[int] = []
    spans: list[float] = []
    for session_label in unique_sessions:
        session_mask = session_labels_norm == session_label
        session_times = np.asarray(time[session_mask], dtype=np.float64)
        if session_times.size > 0:
            spans.append(float(np.max(session_times) - np.min(session_times)))
        for filter_label in unique_filters:
            count = int(
                np.count_nonzero(
                    np.logical_and(session_mask, filter_labels == filter_label)
                )
            )
            if count > 0:
                group_counts.append(count)

    return _SessionSummary(
        n_sessions=len(unique_sessions),
        min_group_count=min(group_counts) if group_counts else 0,
        median_session_span_days=(
            float(np.median(np.asarray(spans, dtype=np.float64))) if spans else 0.0
        ),
    )


def _build_fourier_columns(
    t_rel: npt.NDArray[np.float64],
    frequency: float,
    fourier_order: int,
) -> npt.NDArray[np.float64]:
    omega_t = 2.0 * np.pi * float(frequency) * np.asarray(t_rel, dtype=np.float64)
    cols: list[npt.NDArray[np.float64]] = []
    for harmonic in range(1, int(fourier_order) + 1):
        angle = harmonic * omega_t
        cols.append(np.cos(angle))
        cols.append(np.sin(angle))
    return np.column_stack(cols).astype(np.float64, copy=False)


def _phase_prior_bounds(
    min_alpha_deg: float,
) -> tuple[tuple[float, float], tuple[float, float]]:
    alpha_center = float(np.clip(min_alpha_deg, _HG_PHASE_MIN_DEG, _HG_PHASE_MAX_DEG))
    alpha_arc = np.linspace(
        max(_HG_PHASE_MIN_DEG, alpha_center - _HG_ARC_HALF_WIDTH_DEG),
        min(_HG_PHASE_MAX_DEG, alpha_center + _HG_ARC_HALF_WIDTH_DEG),
        25,
        dtype=np.float64,
    )
    coeffs_min = np.polyfit(alpha_arc, hg_phase_correction(alpha_arc, _HG_G_MIN), deg=2)
    coeffs_max = np.polyfit(alpha_arc, hg_phase_correction(alpha_arc, _HG_G_MAX), deg=2)
    c1_min, c1_max = sorted((float(coeffs_min[1]), float(coeffs_max[1])))
    c2_min, c2_max = sorted((float(coeffs_min[0]), float(coeffs_max[0])))
    return (c1_min, c1_max), (c2_min, c2_max)


def _phase_prior_rows(
    n_par: int,
    design_info: _DesignInfo,
    min_phase_angle_deg: float,
) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64]]:
    """Two weighted pseudo-observations constraining the phase-angle (H-G) coefficients.

    NOTE: these rows make every Fourier fit a RIDGE-REGULARIZED
    least-squares solve, not ordinary least squares. The reported RSS / residual_sigma
    / df / BIC / F-test are computed on the REAL observations only and exclude this
    prior penalty, so those statistics (and the order selection, sigma thresholds, and
    uncertainty intervals derived from them) are regularized/approximate, not exact OLS
    quantities. Treat them as such when interpreting the confidence surface.
    """
    (c1_min, c1_max), (c2_min, c2_max) = _phase_prior_bounds(min_phase_angle_deg)
    rows = np.zeros((2, n_par), dtype=np.float64)
    rows[0, design_info.phase_c1_idx] = 1.0
    rows[1, design_info.phase_c2_idx] = 1.0
    target = np.asarray(
        [0.5 * (c1_min + c1_max), 0.5 * (c2_min + c2_max)],
        dtype=np.float64,
    )
    sigma = np.asarray(
        [
            max(0.5 * abs(c1_max - c1_min), _PRIOR_SIGMA_FLOOR),
            max(0.5 * abs(c2_max - c2_min), _PRIOR_SIGMA_FLOOR),
        ],
        dtype=np.float64,
    )
    weights = 1.0 / np.square(sigma)
    return rows, target, weights


def _weighted_lstsq(
    design: npt.NDArray[np.float64],
    target: npt.NDArray[np.float64],
    weights: npt.NDArray[np.float64] | None,
) -> npt.NDArray[np.float64]:
    if weights is None:
        coeffs, *_ = np.linalg.lstsq(design, target, rcond=None)
    else:
        sqrt_w = np.sqrt(np.asarray(weights, dtype=np.float64))
        coeffs, *_ = np.linalg.lstsq(
            design * sqrt_w[:, None], target * sqrt_w, rcond=None
        )
    return np.asarray(coeffs, dtype=np.float64)


def _paper_sigma(
    residuals: npt.NDArray[np.float64],
    weights: npt.NDArray[np.float64] | None,
    *,
    n_obs: int,
    n_fit: int,
    n_par: int,
) -> tuple[float, float, int]:
    df = int(n_fit - n_par)
    if df <= 0:
        return float("inf"), float("inf"), df
    if weights is None:
        rss = float(np.sum(np.square(residuals)))
        sigma2 = rss / float(df)
        return float(np.sqrt(max(sigma2, 0.0))), rss, df

    w = np.asarray(weights, dtype=np.float64)
    rss = float(np.sum(w * np.square(residuals)))
    weight_sum = float(np.sum(w))
    if weight_sum <= 0.0:
        return float("inf"), float("inf"), df
    sigma2 = (float(n_obs) / weight_sum) * rss / float(df)
    return float(np.sqrt(max(sigma2, 0.0))), rss, df


def _fit_frequency(
    t_rel: npt.NDArray[np.float64],
    y: npt.NDArray[np.float64],
    design_info: _DesignInfo,
    frequency: float,
    fourier_order: int,
    *,
    clip_sigma: float,
    weights: npt.NDArray[np.float64] | None,
    max_clip_iterations: int = 8,
) -> _FitResult | None:
    n_obs = len(y)
    fixed = np.asarray(design_info.fixed, dtype=np.float64)
    n_par = int(fixed.shape[1] + 2 * int(fourier_order))
    if n_obs <= n_par:
        return None

    if weights is None:
        weights_real = None
    else:
        weights_real = np.asarray(weights, dtype=np.float64)

    min_phase = float(np.min(fixed[:, design_info.phase_c1_idx]))
    prior_rows, prior_target, prior_weights = _phase_prior_rows(
        n_par, design_info, min_phase
    )
    mask = np.ones(n_obs, dtype=bool)

    def _solve(
        idx: npt.NDArray[np.int64],
    ) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64], float, float, int]:
        """Weighted, prior-augmented Fourier fit at the unmasked rows ``idx``."""
        fourier = _build_fourier_columns(t_rel[idx], frequency, fourier_order)
        design_real = np.concatenate([fixed[idx], fourier], axis=1)
        target_real = np.asarray(y[idx], dtype=np.float64)
        design = np.vstack([design_real, prior_rows])
        target = np.concatenate([target_real, prior_target])
        if weights_real is None:
            weights_aug = np.concatenate(
                [np.ones(target_real.size, dtype=np.float64), prior_weights]
            )
            real_weights = None
        else:
            weights_aug = np.concatenate([weights_real[idx], prior_weights])
            real_weights = weights_real[idx]
        coeffs = _weighted_lstsq(design, target, weights_aug)
        residuals = target_real - design_real @ coeffs
        sigma, rss, df = _paper_sigma(
            residuals, real_weights, n_obs=n_obs, n_fit=idx.size, n_par=n_par
        )
        return coeffs, residuals, sigma, rss, df

    def _result(
        idx: npt.NDArray[np.int64],
        coeffs: npt.NDArray[np.float64],
        sigma: float,
        rss: float,
        df: int,
    ) -> _FitResult:
        return _FitResult(
            frequency=float(frequency),
            fourier_order=int(fourier_order),
            coeffs=np.asarray(coeffs, dtype=np.float64),
            residual_sigma=float(sigma),
            rss=float(rss),
            df=int(df),
            n_par=int(n_par),
            n_fit=int(idx.size),
            n_clipped=int(n_obs - idx.size),
            phase_c1_idx=int(design_info.phase_c1_idx),
            phase_c2_idx=int(design_info.phase_c2_idx),
        )

    for _ in range(max(1, int(max_clip_iterations))):
        idx = np.flatnonzero(mask)
        if idx.size <= n_par:
            return None
        coeffs, residuals, sigma, rss, df = _solve(idx)
        if not np.isfinite(sigma):
            return None
        clip_mask = np.abs(residuals) <= float(clip_sigma) * sigma
        if np.all(clip_mask):
            return _result(idx, coeffs, sigma, rss, df)
        new_mask = mask.copy()
        new_mask[idx[~clip_mask]] = False
        if np.array_equal(new_mask, mask):
            break
        mask = new_mask

    idx = np.flatnonzero(mask)
    if idx.size <= n_par:
        return None
    coeffs, residuals, sigma, rss, df = _solve(idx)
    if not np.isfinite(sigma):
        return None
    return _result(idx, coeffs, sigma, rss, df)


def _fit_frequency_unclipped(
    t_rel: npt.NDArray[np.float64],
    y: npt.NDArray[np.float64],
    design_info: _DesignInfo,
    frequency: float,
    fourier_order: int,
    *,
    weights: npt.NDArray[np.float64] | None,
) -> _FitResult | None:
    n_obs = len(y)
    fixed = np.asarray(design_info.fixed, dtype=np.float64)
    n_par = int(fixed.shape[1] + 2 * int(fourier_order))
    if n_obs <= n_par:
        return None

    fourier = _build_fourier_columns(t_rel, frequency, fourier_order)
    design_real = np.concatenate([fixed, fourier], axis=1)
    target_real = np.asarray(y, dtype=np.float64)
    min_phase = float(np.min(fixed[:, design_info.phase_c1_idx]))
    prior_rows, prior_target, prior_weights = _phase_prior_rows(
        n_par, design_info, min_phase
    )
    design = np.vstack([design_real, prior_rows])
    target = np.concatenate([target_real, prior_target])
    if weights is None:
        weights_aug = np.concatenate([np.ones(n_obs, dtype=np.float64), prior_weights])
        real_weights = None
    else:
        real_weights = np.asarray(weights, dtype=np.float64)
        weights_aug = np.concatenate([real_weights, prior_weights])
    coeffs = _weighted_lstsq(design, target, weights_aug)
    residuals = target_real - design_real @ coeffs
    sigma, rss, df = _paper_sigma(
        residuals,
        real_weights,
        n_obs=n_obs,
        n_fit=n_obs,
        n_par=n_par,
    )
    if not np.isfinite(sigma):
        return None
    return _FitResult(
        frequency=float(frequency),
        fourier_order=int(fourier_order),
        coeffs=np.asarray(coeffs, dtype=np.float64),
        residual_sigma=float(sigma),
        rss=float(rss),
        df=int(df),
        n_par=int(n_par),
        n_fit=int(n_obs),
        n_clipped=0,
        phase_c1_idx=int(design_info.phase_c1_idx),
        phase_c2_idx=int(design_info.phase_c2_idx),
    )


def _f_test_confidence(small: _FitResult, large: _FitResult) -> float:
    if large.n_par <= small.n_par:
        return 0.0
    if not np.isfinite(small.residual_sigma) or not np.isfinite(large.residual_sigma):
        return 0.0
    df_small = int(small.df)
    df_large = int(large.df)
    if df_small <= 0 or df_large <= 0:
        return 0.0
    variance_small = float(small.residual_sigma) ** 2
    variance_large = float(large.residual_sigma) ** 2
    if variance_small <= variance_large or variance_large <= 0.0:
        return 0.0
    f_value = variance_small / variance_large
    if not np.isfinite(f_value) or f_value <= 1.0:
        return 0.0
    # The papers describe F-tests in terms of the fitted sigma values and
    # the number of fitted observations, so use a directional variance-ratio
    # confidence rather than a nested-model extra-sum-of-squares test.
    return float(f_dist.cdf(f_value, df_small, df_large))


def _fit_bic(fit: _FitResult) -> float:
    n = int(fit.n_fit)
    if n <= fit.n_par or fit.rss <= 0.0:
        return float("inf")
    return float(n * np.log(fit.rss / float(n)) + fit.n_par * np.log(float(n)))


def _select_order(
    candidate_fits: dict[int, _FitResult], required_confidence: float
) -> _FitResult:
    valid_orders = sorted(candidate_fits)
    for i, order in enumerate(valid_orders):
        candidate = candidate_fits[order]
        significantly_worse = False
        for higher_order in valid_orders[i + 1 :]:
            higher = candidate_fits[higher_order]
            if _f_test_confidence(candidate, higher) > float(required_confidence):
                significantly_worse = True
                break
        if not significantly_worse:
            return candidate
    return candidate_fits[valid_orders[-1]]


def _sigma_threshold_for_confidence(best_fit: _FitResult, confidence: float) -> float:
    if best_fit.df <= 0:
        return float("inf")
    f_critical = float(f_dist.ppf(float(confidence), best_fit.df, best_fit.df))
    if not np.isfinite(f_critical) or f_critical <= 0.0:
        return float("inf")
    return float(best_fit.residual_sigma * np.sqrt(f_critical))


def _count_local_extrema(
    coeffs: npt.NDArray[np.float64],
    fourier_order: int,
    dense_points: int = 2048,
) -> tuple[int, int]:
    phase = np.linspace(0.0, 1.0, dense_points, endpoint=False)
    periodic = np.zeros_like(phase)
    start = coeffs.size - 2 * int(fourier_order)
    for harmonic in range(1, int(fourier_order) + 1):
        idx = start + 2 * (harmonic - 1)
        periodic += coeffs[idx] * np.cos(2.0 * np.pi * harmonic * phase)
        periodic += coeffs[idx + 1] * np.sin(2.0 * np.pi * harmonic * phase)
    prev_vals = np.roll(periodic, 1)
    next_vals = np.roll(periodic, -1)
    maxima = (periodic > prev_vals) & (periodic >= next_vals)
    minima = (periodic < prev_vals) & (periodic <= next_vals)
    return int(np.count_nonzero(maxima)), int(np.count_nonzero(minima))


def _periodic_amplitude_from_coeffs(
    coeffs: npt.NDArray[np.float64],
    fourier_order: int,
    dense_points: int = 4096,
) -> float:
    """Peak-to-peak amplitude of the periodic (Fourier) part of a coefficient vector.

    Canonical amplitude helper for the Fourier path (start = len - 2*order
    convention), shared by ``_amplitude_from_fit``.
    """
    phase = np.linspace(0.0, 1.0, dense_points, endpoint=False)
    periodic = np.zeros_like(phase)
    start = coeffs.size - 2 * int(fourier_order)
    for harmonic in range(1, int(fourier_order) + 1):
        idx = start + 2 * (harmonic - 1)
        periodic += coeffs[idx] * np.cos(2.0 * np.pi * harmonic * phase)
        periodic += coeffs[idx + 1] * np.sin(2.0 * np.pi * harmonic * phase)
    return float(np.max(periodic) - np.min(periodic))


def _amplitude_from_fit(
    fit: _FitResult,
    dense_points: int = 4096,
) -> float:
    return _periodic_amplitude_from_coeffs(fit.coeffs, fit.fourier_order, dense_points)


def _fit_with_period(fit: _FitResult) -> _FitWithPeriod:
    n_maxima, _ = _count_local_extrema(fit.coeffs, fit.fourier_order)
    is_period_doubled = n_maxima == 1
    period_days = (2.0 if is_period_doubled else 1.0) / float(fit.frequency)
    return _FitWithPeriod(
        fit=fit,
        period_days=float(period_days),
        period_hours=float(period_days * 24.0),
        is_period_doubled=bool(is_period_doubled),
    )


# ---------------------------------------------------------------------------
# Public data model: rotation-period observations and results.
# ---------------------------------------------------------------------------


[docs] class RotationPeriodObservations(qv.Table): time = Timestamp.as_column() mag = qv.Float64Column() mag_sigma = qv.Float64Column(nullable=True) filter = qv.LargeStringColumn(nullable=True) session_id = qv.LargeStringColumn(nullable=True) r_au = qv.Float64Column() delta_au = qv.Float64Column() phase_angle_deg = qv.Float64Column()
[docs] @classmethod def from_point_source_observations( cls, detections: PointSourceDetections, exposures: Exposures, object_coords: CartesianCoordinates, ) -> RotationPeriodObservations: """Build observations from adam_core point-source detections + exposures. Links this table to the core adam_core observation primitives: one row per ``PointSourceDetections`` entry, with ``filter`` and the per-exposure observing geometry (heliocentric distance ``r_au``, observer distance ``delta_au``, and solar ``phase_angle_deg``) derived from the aligned ``Exposures`` and the object's heliocentric ``CartesianCoordinates``. ``object_coords`` must be heliocentric (origin=SUN) and the same length and order as ``detections``; ``detections.exposure_id`` is used to align each detection to its exposure. ``mag`` / ``r_au`` / ``delta_au`` / ``phase_angle_deg`` must be finite (and the distances positive) or a ``ValueError`` is raised. """ # Lazy import avoids a module-load cycle (the wrappers module imports this # one); the geometry pipeline lives in that adapter module. from .wrappers import ( build_rotation_period_observations_from_detections, ) return build_rotation_period_observations_from_detections( detections, exposures, object_coords )
[docs] class RotationPeriodResult(qv.Table): period_days = qv.Float64Column() period_hours = qv.Float64Column() frequency_cycles_per_day = qv.Float64Column() profile = qv.LargeStringColumn() period_verdict = qv.LargeStringColumn() # LCDB-U-style reliability code as a STRING ("3"/"2"/"1", highest first). Kept a # string (not int) to mirror LCDB U codes and stay forward-compatible with # qualified codes (e.g. "1+"); do NOT sort or compare it numerically. reliability_code = qv.LargeStringColumn() confidence_flags = qv.LargeListColumn(pa.large_string(), nullable=True) insufficiency_reasons = qv.LargeListColumn(pa.large_string(), nullable=True) is_valid = qv.BooleanColumn() is_reliable = qv.BooleanColumn() period_lower_days = qv.Float64Column(nullable=True) period_upper_days = qv.Float64Column(nullable=True) relative_period_uncertainty = qv.Float64Column(nullable=True) alternate_period_days = qv.LargeListColumn(pa.float64(), nullable=True) fourier_period_days = qv.Float64Column(nullable=True) fourier_order = qv.Int64Column(nullable=True) fourier_sigma_threshold = qv.Float64Column(nullable=True) fourier_phase_c1 = qv.Float64Column(nullable=True) fourier_phase_c2 = qv.Float64Column(nullable=True) residual_sigma_mag = qv.Float64Column(nullable=True) fourier_is_valid = qv.BooleanColumn(nullable=True) fourier_is_reliable = qv.BooleanColumn(nullable=True) fourier_alternate_period_days = qv.LargeListColumn(pa.float64(), nullable=True) phase_coverage_fraction = qv.Float64Column(nullable=True) n_rotations_spanned = qv.Float64Column(nullable=True) amplitude_snr = qv.Float64Column(nullable=True) n_significant_aliases = qv.Int64Column(nullable=True) n_observations = qv.Int64Column() n_fit_observations = qv.Int64Column() n_clipped = qv.Int64Column() n_filters = qv.Int64Column() n_sessions = qv.Int64Column() used_session_offsets = qv.BooleanColumn() is_period_doubled = qv.BooleanColumn()
[docs] @classmethod def single_insufficient( cls, *, reasons: list[str], confidence_flags: list[str] | None = None, n_observations: int = 0, n_filters: int = 0, n_sessions: int = 0, profile: str = "default", ) -> "RotationPeriodResult": """One-row ``insufficient_data`` result: NaN period, every nullable diagnostic ``None``. The canonical builder for the insufficient verdict. Used both by the solver's early-exit path and by the detection wrappers when an object cannot be solved, so a grouped solve returns one row per object id rather than silently dropping failures. ``period_verdict``/``reliability_code`` are the contract constants (``"insufficient_data"`` / ``"1"``). """ return cls.from_kwargs( period_days=[float("nan")], period_hours=[float("nan")], frequency_cycles_per_day=[float("nan")], profile=[profile], period_verdict=["insufficient_data"], reliability_code=["1"], confidence_flags=[list(confidence_flags or [])], insufficiency_reasons=[list(reasons)], is_valid=[False], is_reliable=[False], period_lower_days=[None], period_upper_days=[None], relative_period_uncertainty=[None], alternate_period_days=[[]], fourier_period_days=[None], fourier_order=[None], fourier_sigma_threshold=[None], fourier_phase_c1=[None], fourier_phase_c2=[None], residual_sigma_mag=[None], fourier_is_valid=[None], fourier_is_reliable=[None], fourier_alternate_period_days=[[]], phase_coverage_fraction=[None], n_rotations_spanned=[None], amplitude_snr=[None], n_significant_aliases=[None], n_observations=[int(n_observations)], n_fit_observations=[0], n_clipped=[0], n_filters=[int(n_filters)], n_sessions=[int(n_sessions)], used_session_offsets=[False], is_period_doubled=[False], )
[docs] class GroupedRotationPeriodResults(qv.Table): object_id = qv.LargeStringColumn() result = RotationPeriodResult.as_column()
# --------------------------------------------------------------------------- # Period-recovery scoring metrics (standard-candle validation). All periods in # hours; recovered and LCDB truth are both synodic (no sidereal correction). # --------------------------------------------------------------------------- HARMONIC_FACTORS: list[float] = [ 0.25, 1.0 / 3.0, 0.5, 2.0 / 3.0, 0.75, 1.0, 4.0 / 3.0, 1.5, 2.0, 3.0, 4.0, ] """Harmonic factors applied to the recovered period when diagnosing aliases."""
[docs] def relative_error_pct(p_rec: float, p_true: float) -> float: """Strict relative error of a recovered period, in percent. Parameters ---------- p_rec : float Recovered period (hours). p_true : float True (LCDB) period (hours); must be positive. Returns ------- float ``100 * |p_rec - p_true| / p_true``. """ return float(100.0 * abs(float(p_rec) - float(p_true)) / float(p_true))
[docs] def harmonic_adjusted_error_pct(p_rec: float, p_true: float) -> tuple[float, float]: """Harmonic-tolerant relative error and the best-fitting harmonic factor. Minimises ``|p_rec * f - p_true| / p_true`` over :data:`HARMONIC_FACTORS`. Parameters ---------- p_rec : float Recovered period (hours). p_true : float True (LCDB) period (hours); must be positive. Returns ------- tuple of (float, float) The minimum harmonic-adjusted error in percent, and the factor ``f`` that achieves it. """ factors = np.asarray(HARMONIC_FACTORS, dtype=np.float64) errors = np.abs(float(p_rec) * factors - float(p_true)) / float(p_true) best = int(np.argmin(errors)) return float(100.0 * errors[best]), float(factors[best])
[docs] def alias_bucket(best_factor: float) -> str: """Label the harmonic-alias bucket for the best-fitting factor. Parameters ---------- best_factor : float The harmonic factor returned by :func:`harmonic_adjusted_error_pct`. Returns ------- str One of ``"1x"``, ``"1/4x"``, ``"1/3x"``, ``"1/2x"``, ``"2/3x"``, ``"3/4x"``, ``"4/3x"``, ``"3/2x"``, ``"2x"``, ``"3x"``, ``"4x"``, or ``"other"`` if no factor is within 5% of ``best_factor``. """ labels: list[tuple[float, str]] = [ (1.0, "1x"), (0.25, "1/4x"), (1.0 / 3.0, "1/3x"), (0.5, "1/2x"), (2.0 / 3.0, "2/3x"), (0.75, "3/4x"), (4.0 / 3.0, "4/3x"), (1.5, "3/2x"), (2.0, "2x"), (3.0, "3x"), (4.0, "4x"), ] factor = float(best_factor) for value, label in labels: if abs(factor - value) <= 0.05 * value: return label return "other"
[docs] def within_tolerance(p_rec: float, p_true: float, tolerance_fraction: float) -> bool: """Whether the raw relative error is within the fixture's tolerance. Uses the strict :func:`relative_error_pct` (no harmonic adjustment) compared against the per-fixture ``tolerance_fraction`` from the npz. Parameters ---------- p_rec : float Recovered period (hours). p_true : float True (LCDB) period (hours); must be positive. tolerance_fraction : float Allowed fractional error (e.g. ``0.01`` for a tight U=3 fixture). Returns ------- bool ``True`` if ``relative_error_pct(p_rec, p_true) <= tolerance_fraction * 100``. """ return bool(relative_error_pct(p_rec, p_true) <= float(tolerance_fraction) * 100.0)
[docs] def near_day_alias( p_rec_hours: float, p_true_hours: float, tolerance_fraction: float = 0.02, ) -> bool: """Whether the recovered frequency is a diurnal-cadence alias of the truth. Flags lock-on to a day-aliased frequency (cycles per day): the recovered frequency sits within ``tolerance_fraction`` of ``f_true +/- n`` for ``n`` in ``{1, 2}`` cycles/day, i.e. ``|f_rec - (f_true +/- n)| / f_true <= tol``. Parameters ---------- p_rec_hours : float Recovered period (hours); must be positive. p_true_hours : float True (LCDB) period (hours); must be positive. tolerance_fraction : float, optional Fractional tolerance on the day-aliased frequency match (default 0.02). Returns ------- bool ``True`` if a day-alias relationship holds. """ p_rec = float(p_rec_hours) p_true = float(p_true_hours) if p_rec <= 0.0 or p_true <= 0.0: return False f_rec = 24.0 / p_rec f_true = 24.0 / p_true tol = float(tolerance_fraction) for n in (1, 2): for aliased in (f_true + n, f_true - n): if aliased <= 0.0: continue if abs(f_rec - aliased) / f_true <= tol: return True return False