"""
Time utilities and a fast, integer-backed timestamp representation.
This module defines `Timestamp`, a quivr table storing times as integer MJD days plus
integer nanoseconds within the day, tagged with a time scale (`tai`, `tt`, `utc`, `tdb`, `ut1`).
`Timestamp.rescale()` implements fast conversions between supported scales. For UT1, we
delegate to astropy because UT1 requires IERS tables and interpolation.
### Empirical validation summary (2026-01)
We validated the impact of the `rescale()` implementation choice (default implementation vs
an astropy-backed baseline) on an Asteroid Institute “real world” pipeline:
- Rubin X05 MPC observation timestamps (UTC) + RA/Dec astrometry
- SBDB orbit queries (for known objects)
- Ephemeris generation with `adam_assist.ASSISTPropagator`
- Residuals computed via `adam_core.coordinates.residuals.Residuals` on equatorial spherical
coordinates (RA/Dec as lon/lat)
- Two runs: default `Timestamp.rescale` vs a global override where `Timestamp.rescale`
dispatches to `Timestamp.rescale_astropy` for all internal conversions
Results (largest run: 500 objects, max 100 observations/object; 43,919 ephemeris points):
- Predicted position delta (default vs astropy baseline): max ~0.001 mas.
- Residuals to Rubin astrometry (arcsec): p50 ~0.031, p90 ~0.088 for both methods.
- Residual delta (default - baseline): ~0 mas at p50/p90 (no measurable change).
We also confirmed these UTC observation times exhibit a default-vs-astropy UTC→TDB offset
in the expected tens-of-microseconds range (~14.6–21.8 µs for a sampled subset).
"""
from __future__ import annotations
import hashlib
from typing import Callable
import astropy.time
import erfa
import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
import quivr as qv
SCALES = {
"tai",
"tt",
"ut1",
"utc",
"tdb",
}
# The Modified Julian Date of the J2000 epoch in TDB scale:
_J2000_TDB_MJD = 51544.5
# Time constants
_SECONDS_IN_DAY = 86_400
_NANOS_IN_DAY = 86_400_000_000_000 # int, exact
_NANOS_IN_DAY_F64 = float(_NANOS_IN_DAY) # for pyarrow float division
_TAI_TT_NS_CORRECTION = 32_184_000_000 # TT = TAI + 32.184s
# ERFA converters used by `_leap_seconds_correction`:
# They accept full-day and fractional-day JD arrays and return the converted pair.
_ErfaDayFracConverter = Callable[
[np.ndarray, np.ndarray], tuple[np.ndarray, np.ndarray]
]
[docs]
class Timestamp(qv.Table):
# Scale, the rate at which time passes:
scale = qv.StringAttribute(default="tai")
# Days since MJD epoch (1858-11-17T00:00:00):
days = qv.Int64Column()
# Nanos since start of day:
nanos = qv.Int64Column()
[docs]
def micros(self) -> pa.Int64Array:
return pc.divide(self.nanos, 1_000)
[docs]
def millis(self) -> pa.Int64Array:
return pc.divide(self.nanos, 1_000_000)
[docs]
def seconds(self) -> pa.Int64Array:
return pc.divide(self.nanos, 1_000_000_000)
[docs]
def key(self, *, scale: str | None = "tdb") -> np.ndarray:
"""
Return an int64 key for each timestamp: (days * NANOS_IN_DAY + nanos).
This is useful for fast grouping/uniquing and as a stable cache key when paired
with a specific time scale.
"""
if len(self) == 0:
return np.empty(0, dtype=np.int64)
t = self if scale is None else self.rescale(scale)
days = t.days.to_numpy(zero_copy_only=False).astype(np.int64)
nanos = t.nanos.to_numpy(zero_copy_only=False).astype(np.int64)
return days * _NANOS_IN_DAY + nanos
[docs]
def signature(self, *, scale: str | None = "tdb") -> tuple[int, int, int, int]:
"""
Return a cheap signature for this Timestamp array.
The signature is (n, first_key, last_key, sum_mod) where keys are produced by `key()`.
"""
n = int(len(self))
if n == 0:
return 0, 0, 0, 0
key = self.key(scale=scale)
first = int(key[0])
last = int(key[-1])
sum_mod = int(np.sum(key, dtype=np.int64) & np.int64(0x7FFF_FFFF_FFFF_FFFF))
return n, first, last, sum_mod
[docs]
def cache_digest(self, *, scale: str | None = "tdb") -> int:
"""
Return an order-sensitive 64-bit digest of timestamp keys.
This is intended for cache keys where row alignment matters.
"""
if len(self) == 0:
return 0
key = np.asarray(self.key(scale=scale), dtype="<i8")
payload = np.ascontiguousarray(key).view(np.uint8)
digest = hashlib.blake2b(payload, digest_size=8).digest()
return int.from_bytes(digest, byteorder="little", signed=False)
[docs]
def mjd(self) -> pa.lib.DoubleArray:
return pc.add(self.days, self.fractional_days())
[docs]
def jd(self) -> pa.lib.DoubleArray:
return pc.add(self.mjd(), 2400000.5)
[docs]
def et(self) -> pa.lib.DoubleArray:
"""
Returns the times as ET seconds in a pyarrow array.
"""
tdb = self.rescale("tdb")
mjd = tdb.mjd()
return pc.multiply(pc.subtract(mjd, _J2000_TDB_MJD), _SECONDS_IN_DAY)
[docs]
def to_numpy(self) -> np.ndarray:
"""
Returns the times as TDB MJDs in a numpy array.
"""
return self.rescale("tdb").mjd().to_numpy(False)
[docs]
def to_iso8601(self) -> pa.lib.StringArray:
"""
Returns the times as ISO 8601 strings in a pyarrow array.
"""
if len(self) == 0:
return pa.array([], type=pa.string())
return pa.array(self.to_astropy().isot, type=pa.string())
[docs]
@classmethod
def from_iso8601(
cls, iso: pa.lib.StringArray | list[str], scale="utc"
) -> Timestamp:
"""
Create a Timestamp from ISO 8601 strings (for example, '2020-01-02T14:15:16').
"""
return cls.from_astropy(astropy.time.Time(iso, format="isot", scale=scale))
[docs]
@classmethod
def from_mjd(cls, mjd: pa.lib.DoubleArray, scale: str = "tai") -> Timestamp:
days = pc.floor(mjd)
fractional_days = pc.subtract(mjd, days)
days = pc.cast(days, pa.int64())
nanos = pc.cast(
pc.round(pc.multiply(fractional_days, _NANOS_IN_DAY)), pa.int64()
)
return cls.from_kwargs(days=days, nanos=nanos, scale=scale)
[docs]
@classmethod
def from_jd(cls, jd: pa.lib.DoubleArray, scale: str = "tai") -> Timestamp:
return cls.from_mjd(pc.subtract(jd, 2400000.5), scale)
[docs]
@classmethod
def from_et(cls, et: pa.lib.DoubleArray, scale: str = "tdb") -> Timestamp:
return cls.from_mjd(pc.divide(et, 86400), scale)
[docs]
def fractional_days(self) -> pa.lib.DoubleArray:
# IMPORTANT: pyarrow integer / integer division truncates, so ensure float divisor.
return pc.divide(self.nanos, _NANOS_IN_DAY_F64)
[docs]
def rounded(self, precision: str = "ns") -> Timestamp:
if precision == "ns":
return self
elif precision == "us":
nanos = pc.multiply(self.micros(), 1_000)
elif precision == "ms":
nanos = pc.multiply(self.millis(), 1_000_000)
elif precision == "s":
nanos = pc.multiply(self.seconds(), 1_000_000_000)
else:
raise ValueError(f"Unsupported precision: {precision}")
return self.set_column("nanos", nanos)
[docs]
def equals(self, other: Timestamp, precision: str = "ns") -> pa.BooleanArray:
if self.scale != other.scale:
raise ValueError("Cannot compare timestamps with different scales")
if len(other) == 1:
return self.equals_scalar(other.days[0], other.nanos[0], precision)
else:
return self.equals_array(other, precision)
[docs]
def equals_scalar(
self, days: int, nanos: int, precision: str = "ns"
) -> pa.BooleanArray:
delta_days, delta_nanos = self.difference_scalar(days, nanos)
if precision == "ns":
max_deviation = 0
elif precision == "us":
max_deviation = 999
elif precision == "ms":
max_deviation = 999_999
elif precision == "s":
max_deviation = 999_999_999
else:
raise ValueError(f"Unsupported precision: {precision}")
return _duration_arrays_within_tolerance(delta_days, delta_nanos, max_deviation)
[docs]
def equals_array(self, other: Timestamp, precision: str = "ns") -> pa.BooleanArray:
"""
Compare two Timestamps, returning a BooleanArray indicating
whether each element is equal.
The Timestamps must have the same scale, and the same length.
"""
if self.scale != other.scale:
raise ValueError("Cannot compare timestamps with different scales")
if len(self) != len(other):
raise ValueError("Timestamps must have the same length")
delta_days, delta_nanos = self.difference(other)
if precision == "ns":
max_deviation = 0
elif precision == "us":
max_deviation = 999
elif precision == "ms":
max_deviation = 999_999
elif precision == "s":
max_deviation = 999_999_999
else:
raise ValueError(f"Unsupported precision: {precision}")
return _duration_arrays_within_tolerance(delta_days, delta_nanos, max_deviation)
[docs]
def max(self) -> Timestamp:
"""
Compute the maximum time.
Returns
-------
max_time : `~adam_core.time.Timestamp`
The maximum time. If there are multiple maximum times,
one of them is returned.
"""
# Compute the maximum day
max_day = pc.max(self.days)
days_mask = pc.equal(self.days, max_day)
# Compute the maximum nanos for the maximum day
max_nanos = pc.max(self.nanos.filter(days_mask))
nanos_mask = pc.equal(self.nanos, max_nanos)
return self.apply_mask(pc.and_(days_mask, nanos_mask))[0]
[docs]
def min(self) -> Timestamp:
"""
Compute the minimum time.
Returns
-------
min_time : `~adam_core.time.Timestamp`
The minimum time. If there are multiple minimum times,
one of them is returned.
"""
# Compute the minimum day
min_day = pc.min(self.days)
days_mask = pc.equal(self.days, min_day)
# Compute the minimum nanos for the minimum day
min_nanos = pc.min(self.nanos.filter(days_mask))
nanos_mask = pc.equal(self.nanos, min_nanos)
return self.apply_mask(pc.and_(days_mask, nanos_mask))[0]
[docs]
@classmethod
def from_astropy(cls, astropy_time: astropy.time.Time) -> Timestamp:
"""Convert an astropy time to a quivr timestamp.
This is a lossy conversion, since astropy uses floating point
to represent times, while quivr uses integers.
The astropy time must use a scale supported by quivr. The
supported scales are "tai", "tt", "ut1", "utc", and "tdb".
"""
if astropy_time.scale not in SCALES:
raise ValueError(f"Unsupported scale: {astropy_time.scale}")
if astropy_time.isscalar:
return cls._from_astropy_scalar(astropy_time)
jd1, jd2 = astropy_time.jd1, astropy_time.jd2
days, remainder = divmod(jd1 - 2400000.5, 1)
remainder += jd2
mask = remainder < 0
np.add(remainder, 1, where=mask, out=remainder)
np.subtract(days, 1, where=mask, out=days)
mask = remainder >= 1
np.subtract(remainder, 1, where=mask, out=remainder)
np.add(days, 1, where=mask, out=days)
nanos = np.round(remainder * 86400 * 1e9)
# Floating point arithmetic can cause nanos to be 86400e9
# instead of 0, so we need to handle this case
nanos_full_day = nanos == 86400 * 1e9
days = np.where(nanos_full_day, days + 1, days).astype(np.int64)
nanos = np.where(nanos_full_day, 0, nanos)
return cls.from_kwargs(
scale=astropy_time.scale,
days=days,
nanos=nanos,
)
@classmethod
def _from_astropy_scalar(cls, astropy_time: astropy.time.Time) -> Timestamp:
"""Astropy times can be scalar-valued, which requires
separate handling because the numpy array functions won't
work.
"""
jd1, jd2 = astropy_time.jd1, astropy_time.jd2
days, remainder = divmod(jd1 - 2400000.5, 1)
remainder += jd2
if remainder < 0:
remainder += 1
days -= 1
if remainder >= 1:
remainder -= 1
days += 1
nanos = round(remainder * 86400 * 1e9)
return cls.from_kwargs(
scale=astropy_time.scale,
days=[int(days)],
nanos=[nanos],
)
[docs]
def to_astropy(self) -> astropy.time.Time:
"""
Convert the timestamp to an astropy time.
"""
fractional_days = self.fractional_days()
return astropy.time.Time(
val=self.days,
val2=fractional_days,
format="mjd",
scale=self.scale,
)
[docs]
def add_nanos(
self, nanos: pa.lib.Int64Array | int, check_range: bool = True
) -> Timestamp:
"""
Add nanoseconds to the timestamp. Negative nanoseconds are
allowed.
Parameters
----------
nanos : The nanoseconds to add. Can be a scalar or an array of
the same length as the timestamp. Must be in the range [-86400e9, 86400e9).
check_range : If True, check that the nanoseconds are in the
range [-86400e9, 86400e9). If False, the caller is
responsible for ensuring that the nanoseconds are in the
correct range.
"""
if check_range:
if isinstance(nanos, int):
if not -_NANOS_IN_DAY <= nanos < _NANOS_IN_DAY:
raise ValueError("Nanoseconds out of range")
else:
if not pc.all(
pc.and_(
pc.greater_equal(nanos, -_NANOS_IN_DAY),
pc.less(nanos, _NANOS_IN_DAY),
)
).as_py():
raise ValueError("Nanoseconds out of range")
nanos = pc.add_checked(self.nanos, nanos)
overflows = pc.greater_equal(nanos, _NANOS_IN_DAY)
underflows = pc.less(nanos, 0)
mask = pa.StructArray.from_arrays(
[overflows, underflows], names=["overflows", "underflows"]
)
nanos = pc.case_when(
mask,
pc.subtract(nanos, _NANOS_IN_DAY),
pc.add(nanos, _NANOS_IN_DAY),
nanos,
)
days = pc.case_when(
mask,
pc.add(self.days, 1),
pc.subtract(self.days, 1),
self.days,
)
v1 = self.set_column("days", days)
v2 = v1.set_column("nanos", nanos)
return v2
[docs]
def add_seconds(
self, seconds: pa.lib.Int64Array | int | pa.DoubleArray | float
) -> Timestamp:
"""
Add seconds to the timestamp. Negative seconds are supported.
Parameters
----------
seconds : The seconds to add. Can be a scalar or an array of
the same length as the timestamp. Must be in the range [-86400, 86400).
See Also
--------
add_nanos : Add nanoseconds to the timestamp. This method includes
a 'check_range' parameter that allows the caller to disable range
checking for performance reasons.
"""
nanos = pc.cast(pc.round(pc.multiply(seconds, 1_000_000_000)), pa.int64())
return self.add_nanos(nanos)
[docs]
def add_millis(self, millis: pa.lib.Int64Array | int) -> Timestamp:
"""
Add milliseconds to the timestamp. Negative milliseconds are
supported.
Parameters
----------
millis : The milliseconds to add. Can be a scalar or an array of
the same length as the timestamp. Must be in the range [-86400e3, 86400e3).
See Also
--------
add_nanos : Add nanoseconds to the timestamp. This method includes
a 'check_range' parameter that allows the caller to disable range
checking for performance reasons.
"""
nanos = pc.cast(pc.round(pc.multiply(millis, 1_000_000)), pa.int64())
return self.add_nanos(nanos)
[docs]
def add_micros(self, micros: pa.lib.Int64Array | int) -> Timestamp:
"""
Add microseconds to the timestamp. Negative microseconds are
supported.
Parameters
----------
micros : The microseconds to add. Can be a scalar or an array of
the same length as the timestamp. Must be in the range [-86400e6, 86400e6).
See Also
--------
add_nanos : Add nanoseconds to the timestamp. This method includes
a 'check_range' parameter that allows the caller to disable range
checking for performance reasons.
"""
nanos = pc.cast(pc.round(pc.multiply(micros, 1_000)), pa.int64())
return self.add_nanos(nanos)
[docs]
def add_days(self, days: pa.lib.Int64Array | int) -> Timestamp:
"""Add days to the timestamp.
Parameters
----------
days : The days to add. Can be a scalar or an array of the
same length as the timestamp. Use negative values to
subtract days.
"""
return self.set_column("days", pc.add(self.days, days))
[docs]
def add_fractional_days(
self, fractional_days: pa.lib.DoubleArray | float
) -> Timestamp:
"""
Add fractional days to the timestamp.
Parameters
----------
fractional_days : The fractional days to add. Can be a scalar
or an array of the same length as the timestamp. Use
negative values to subtract fractional days.
"""
day_part = pc.floor(fractional_days)
nano_part = pc.subtract(fractional_days, day_part)
days = pc.cast(day_part, pa.int64())
nanos = pc.cast(
pc.multiply(nano_part, _NANOS_IN_DAY),
options=pc.CastOptions(target_type=pa.int64(), allow_float_truncate=True),
)
return self.add_days(days).add_nanos(nanos)
[docs]
def difference_scalar(
self, days: int, nanos: int
) -> tuple[pa.Int64Array, pa.Int64Array]:
"""
Compute the difference between this timestamp and a scalar
timestamp.
The difference is computed as (self - scalar). The result is
presented as a tuple of (days, nanos). The nanos value is
always non-negative, in the range [0, 86400e9).
Parameters
----------
days : The days of the scalar timestamp.
nanos : The nanoseconds of the scalar timestamp.
Returns
-------
days : The difference in days. This value can be negative.
nanos : The difference in nanoseconds. This value is always
non-negative, in the range [0, 86400e9).
Examples
--------
>>> from adam_core.time import Timestamp
>>> ts = Timestamp.from_kwargs(days=[0, 1, 2], nanos=[200, 0, 100])
>>> have_days, have_nanos = ts.difference_scalar(1, 100)
>>> have_days.to_pylist()
[-1, -1, 1]
>>> have_nanos.to_pylist()
[100, 86399999999900, 0]
"""
days1 = pc.subtract(self.days, days)
nanos1 = pc.subtract(self.nanos, nanos)
overflows = pc.greater_equal(nanos1, _NANOS_IN_DAY)
underflows = pc.less(nanos1, 0)
mask = pa.StructArray.from_arrays(
[overflows, underflows], names=["overflows", "underflows"]
)
nanos2 = pc.case_when(
mask,
pc.subtract(nanos1, _NANOS_IN_DAY),
pc.add(nanos1, _NANOS_IN_DAY),
nanos1,
)
days2 = pc.case_when(
mask,
pc.add(days1, 1),
pc.subtract(days1, 1),
days1,
)
return days2, nanos2
[docs]
def difference(self, other: Timestamp) -> tuple[pa.Int64Array, pa.Int64Array]:
"""
Compute the element-wise difference between this timestamp and another.
"""
if self.scale != other.scale:
raise ValueError(
"Cannot compute difference between timestamps with different scales"
)
days1 = pc.subtract(self.days, other.days)
nanos1 = pc.subtract(self.nanos, other.nanos)
overflows = pc.greater_equal(nanos1, _NANOS_IN_DAY)
underflows = pc.less(nanos1, 0)
mask = pa.StructArray.from_arrays(
[overflows, underflows], names=["overflows", "underflows"]
)
nanos2 = pc.case_when(
mask,
pc.subtract(nanos1, _NANOS_IN_DAY),
pc.add(nanos1, _NANOS_IN_DAY),
nanos1,
)
days2 = pc.case_when(
mask,
pc.add(days1, 1),
pc.subtract(days1, 1),
days1,
)
return days2, nanos2
[docs]
def unique(self) -> Timestamp:
"""Return a new Timestamp table containing only the unique
elements from self. Order is not necessarily preserved.
"""
uniqued = self.table.group_by(["days", "nanos"]).aggregate([])
uniqued = uniqued.replace_schema_metadata(self.table.schema.metadata)
return Timestamp.from_pyarrow(uniqued)
[docs]
def rescale_astropy(self, new_scale: str) -> Timestamp:
if self.scale == new_scale:
return self
elif new_scale == "tai":
return Timestamp.from_astropy(self.to_astropy().tai)
elif new_scale == "utc":
return Timestamp.from_astropy(self.to_astropy().utc)
elif new_scale == "tt":
return Timestamp.from_astropy(self.to_astropy().tt)
elif new_scale == "ut1":
return Timestamp.from_astropy(self.to_astropy().ut1)
elif new_scale == "tdb":
return Timestamp.from_astropy(self.to_astropy().tdb)
else:
raise ValueError("Unknown scale: {}".format(new_scale))
def _tt_tdb_correction(
self, positive: bool, correction: np.ndarray | int
) -> np.ndarray:
"""Compute correction from TT to TDB in nanoseconds.
Correction in nanoseconds is used to bring the self value to TT from wherever it is.
Otherwise we can get a few nanoseconds difference in roundtrip.
If positive is False, add a minus sign to get TDB to TT correction.
Math from
gssc.esa.int/navipedia/index.php/Transformations_between_Time_Systems
Note that astropy/ERFA use a different algorithm that is based on
the location on Earth, but also seems to use a few approximations.
As a result the difference can be in some tens of microseconds.
"""
# Going into numpy and then back is considerably faster than doing the
# same calculation using pyarrow. The result is the same if delta is
# rounded before astype. As written, delta is truncated, which may produce
# a 1ns difference.
days = self.days.to_numpy() - 51_545
fracs = (self.nanos.to_numpy() + correction) / _NANOS_IN_DAY + 0.5
centuries = (days + fracs) / 36_525
g = np.radians(35999.050 * centuries + 357.528)
delta = np.sin(g + 0.0167 * np.sin(g)) * 1658000
if not positive:
delta = -delta
return delta.astype(np.int64)
def _leap_seconds_correction(
self, converter: _ErfaDayFracConverter, correction: np.ndarray | int
) -> np.ndarray:
"""Compute leap seconds correction, returned in nanoseconds.
Correction in nanoseconds is used to bring self from wherever it is to TAI.
Converter is an ERFA method used to get the actual leap seconds. An alternative
would be to download and lookup the leap seconds tables ourselves, which is left
as an exercise for the future.
"""
# ERFA methods take and return a pair of ndarrays for the full days and fractions of JD
old_days = self.days.to_numpy() + 2400000
old_frac = (self.nanos.to_numpy() + correction) / _NANOS_IN_DAY + 0.5
new_days, new_frac = converter(old_days, old_frac)
delta = ((new_days - old_days) + (new_frac - old_frac)) * _NANOS_IN_DAY
# Want nice round seconds, but in nanoseconds. Without rounding we occasionally
# get a stray nanosecond, which messes up roundtrip conversions.
delta = np.round(delta * 1e-9).astype(np.int64) * 1_000_000_000
return delta
[docs]
def rescale(self, new_scale: str) -> Timestamp:
"""
Convert this `Timestamp` to `new_scale`.
Notes
-----
- For `ut1` conversions, we delegate to `rescale_astropy()` because UT1 requires IERS
tables and interpolation that astropy manages.
- For conversions involving TDB, this implementation uses a different (non-Earth-location-
dependent) approximation than astropy/ERFA, and differences of 10-30 microseconds
versus astropy can occur. Round-trip conversions within this implementation are designed
to be stable (see tests).
See also
--------
`src/adam_core/time/tests/test_time.py`: correctness + benchmark tests for rescale.
"""
if self.scale == new_scale:
return self
# ut1 requires delta interpolated from tables published by the IERS.
# ERFA functions expect the delta to be provided. Astropy does the
# file downloading and interpolating, so stick with it here.
if self.scale == "ut1" or new_scale == "ut1":
return self.rescale_astropy(new_scale)
# Delta in nanoseconds
correction = None
if self.scale == "tt":
if new_scale == "tai":
correction = -_TAI_TT_NS_CORRECTION
elif new_scale == "utc":
correction = (
self._leap_seconds_correction(erfa.taiutc, 0)
- _TAI_TT_NS_CORRECTION
)
elif new_scale == "tdb":
correction = self._tt_tdb_correction(True, 0)
elif self.scale == "utc":
correction = self._leap_seconds_correction(erfa.utctai, 0)
if new_scale == "tt":
correction += _TAI_TT_NS_CORRECTION
elif new_scale == "tdb":
# We are effectively going UTC->TAI->TT->TDB with corrections here
correction += (
self._tt_tdb_correction(True, correction + _TAI_TT_NS_CORRECTION)
+ _TAI_TT_NS_CORRECTION
)
elif self.scale == "tai":
if new_scale == "tt":
correction = _TAI_TT_NS_CORRECTION
elif new_scale == "tdb":
correction = (
self._tt_tdb_correction(True, _TAI_TT_NS_CORRECTION)
+ _TAI_TT_NS_CORRECTION
)
elif new_scale == "tai":
correction = 0
elif new_scale == "utc":
correction = self._leap_seconds_correction(erfa.taiutc, 0)
elif self.scale == "tdb":
if new_scale == "tt":
correction = self._tt_tdb_correction(False, 0)
elif new_scale == "tai":
correction = self._tt_tdb_correction(False, 0) - _TAI_TT_NS_CORRECTION
elif new_scale == "utc":
correction = self._tt_tdb_correction(False, 0) - _TAI_TT_NS_CORRECTION
correction += self._leap_seconds_correction(erfa.taiutc, correction)
if correction is not None:
tmp = self.add_nanos(correction, check_range=False)
return Timestamp.from_kwargs(
days=tmp.days, nanos=tmp.nanos, scale=new_scale
)
raise ValueError(
"Rescale from {} to {} is not supported".format(self.scale, new_scale)
)
[docs]
def link(
self, other: Timestamp, precision: str = "ns"
) -> qv.MultiKeyLinkage[Timestamp, Timestamp]:
"""
Link this Timestamp to another. The default precision is nanoseconds, but if
other precisions are desired then both this class and the other Timestamp will
be rounded to the desired precision.
If the timescales are different, the other Timestamp will be rescaled to
this Timestamp's timescale.
Parameters
----------
other : The Timestamp to link to.
precision : The precision to use when linking. The default is 'ns'.
Returns
-------
linkage : A MultiKeyLinkage object that can be used to join the two Timestamps.
"""
if self.scale != other.scale:
other = other.rescale(self.scale)
rounded = self.rounded(precision)
other_rounded = other.rounded(precision)
left_keys = {"days": rounded.days, "nanos": rounded.nanos}
right_keys = {"days": other_rounded.days, "nanos": other_rounded.nanos}
return qv.MultiKeyLinkage(self, other, left_keys, right_keys)
def _duration_arrays_within_tolerance(
delta_days: pa.Int64Array, delta_nanos: pa.Int64Array, max_nanos_deviation: int
) -> pa.BooleanArray:
"""Return a boolean array indicating whether the delta_days and delta_nanos
arrays are within the specified tolerance.
The max_nanos_deviation should be the maximum number of
nanoseconds that the the two arrays can deviate to still be
considered 'within tolerance'.
"""
if max_nanos_deviation == 0:
return pc.and_(pc.equal(delta_days, 0), pc.equal(delta_nanos, 0))
cond1 = pc.and_(
pc.equal(delta_days, 0), pc.less(pc.abs(delta_nanos), max_nanos_deviation)
)
cond2 = pc.and_(
pc.equal(delta_days, -1),
pc.greater_equal(pc.abs(delta_nanos), 86400 * 1e9 - max_nanos_deviation),
)
return pc.or_(cond1, cond2)