Source code for adam_core.time.time

"""
Time utilities and a fast, integer-backed timestamp representation.

This module defines `Timestamp`, a quivr table storing times as integer MJD days plus
integer nanoseconds within the day, tagged with a time scale (`tai`, `tt`, `utc`, `tdb`, `ut1`).

`Timestamp.rescale()` implements fast conversions between supported scales. For UT1, we
delegate to astropy because UT1 requires IERS tables and interpolation.

### Empirical validation summary (2026-01)

We validated the impact of the `rescale()` implementation choice (default implementation vs
an astropy-backed baseline) on an Asteroid Institute “real world” pipeline:

- Rubin X05 MPC observation timestamps (UTC) + RA/Dec astrometry
- SBDB orbit queries (for known objects)
- Ephemeris generation with `adam_assist.ASSISTPropagator`
- Residuals computed via `adam_core.coordinates.residuals.Residuals` on equatorial spherical
  coordinates (RA/Dec as lon/lat)
- Two runs: default `Timestamp.rescale` vs a global override where `Timestamp.rescale`
  dispatches to `Timestamp.rescale_astropy` for all internal conversions

Results (largest run: 500 objects, max 100 observations/object; 43,919 ephemeris points):

- Predicted position delta (default vs astropy baseline): max ~0.001 mas.
- Residuals to Rubin astrometry (arcsec): p50 ~0.031, p90 ~0.088 for both methods.
- Residual delta (default - baseline): ~0 mas at p50/p90 (no measurable change).

We also confirmed these UTC observation times exhibit a default-vs-astropy UTC→TDB offset
in the expected tens-of-microseconds range (~14.6–21.8 µs for a sampled subset).
"""

from __future__ import annotations

import hashlib
from typing import Callable

import astropy.time
import erfa
import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
import quivr as qv

SCALES = {
    "tai",
    "tt",
    "ut1",
    "utc",
    "tdb",
}

# The Modified Julian Date of the J2000 epoch in TDB scale:
_J2000_TDB_MJD = 51544.5

# Time constants
_SECONDS_IN_DAY = 86_400
_NANOS_IN_DAY = 86_400_000_000_000  # int, exact
_NANOS_IN_DAY_F64 = float(_NANOS_IN_DAY)  # for pyarrow float division
_TAI_TT_NS_CORRECTION = 32_184_000_000  # TT = TAI + 32.184s

# ERFA converters used by `_leap_seconds_correction`:
# They accept full-day and fractional-day JD arrays and return the converted pair.
_ErfaDayFracConverter = Callable[
    [np.ndarray, np.ndarray], tuple[np.ndarray, np.ndarray]
]


[docs] class Timestamp(qv.Table): # Scale, the rate at which time passes: scale = qv.StringAttribute(default="tai") # Days since MJD epoch (1858-11-17T00:00:00): days = qv.Int64Column() # Nanos since start of day: nanos = qv.Int64Column()
[docs] def micros(self) -> pa.Int64Array: return pc.divide(self.nanos, 1_000)
[docs] def millis(self) -> pa.Int64Array: return pc.divide(self.nanos, 1_000_000)
[docs] def seconds(self) -> pa.Int64Array: return pc.divide(self.nanos, 1_000_000_000)
[docs] def key(self, *, scale: str | None = "tdb") -> np.ndarray: """ Return an int64 key for each timestamp: (days * NANOS_IN_DAY + nanos). This is useful for fast grouping/uniquing and as a stable cache key when paired with a specific time scale. """ if len(self) == 0: return np.empty(0, dtype=np.int64) t = self if scale is None else self.rescale(scale) days = t.days.to_numpy(zero_copy_only=False).astype(np.int64) nanos = t.nanos.to_numpy(zero_copy_only=False).astype(np.int64) return days * _NANOS_IN_DAY + nanos
[docs] def signature(self, *, scale: str | None = "tdb") -> tuple[int, int, int, int]: """ Return a cheap signature for this Timestamp array. The signature is (n, first_key, last_key, sum_mod) where keys are produced by `key()`. """ n = int(len(self)) if n == 0: return 0, 0, 0, 0 key = self.key(scale=scale) first = int(key[0]) last = int(key[-1]) sum_mod = int(np.sum(key, dtype=np.int64) & np.int64(0x7FFF_FFFF_FFFF_FFFF)) return n, first, last, sum_mod
[docs] def cache_digest(self, *, scale: str | None = "tdb") -> int: """ Return an order-sensitive 64-bit digest of timestamp keys. This is intended for cache keys where row alignment matters. """ if len(self) == 0: return 0 key = np.asarray(self.key(scale=scale), dtype="<i8") payload = np.ascontiguousarray(key).view(np.uint8) digest = hashlib.blake2b(payload, digest_size=8).digest() return int.from_bytes(digest, byteorder="little", signed=False)
[docs] def mjd(self) -> pa.lib.DoubleArray: return pc.add(self.days, self.fractional_days())
[docs] def jd(self) -> pa.lib.DoubleArray: return pc.add(self.mjd(), 2400000.5)
[docs] def et(self) -> pa.lib.DoubleArray: """ Returns the times as ET seconds in a pyarrow array. """ tdb = self.rescale("tdb") mjd = tdb.mjd() return pc.multiply(pc.subtract(mjd, _J2000_TDB_MJD), _SECONDS_IN_DAY)
[docs] def to_numpy(self) -> np.ndarray: """ Returns the times as TDB MJDs in a numpy array. """ return self.rescale("tdb").mjd().to_numpy(False)
[docs] def to_iso8601(self) -> pa.lib.StringArray: """ Returns the times as ISO 8601 strings in a pyarrow array. """ if len(self) == 0: return pa.array([], type=pa.string()) return pa.array(self.to_astropy().isot, type=pa.string())
[docs] @classmethod def from_iso8601( cls, iso: pa.lib.StringArray | list[str], scale="utc" ) -> Timestamp: """ Create a Timestamp from ISO 8601 strings (for example, '2020-01-02T14:15:16'). """ return cls.from_astropy(astropy.time.Time(iso, format="isot", scale=scale))
[docs] @classmethod def from_mjd(cls, mjd: pa.lib.DoubleArray, scale: str = "tai") -> Timestamp: days = pc.floor(mjd) fractional_days = pc.subtract(mjd, days) days = pc.cast(days, pa.int64()) nanos = pc.cast( pc.round(pc.multiply(fractional_days, _NANOS_IN_DAY)), pa.int64() ) return cls.from_kwargs(days=days, nanos=nanos, scale=scale)
[docs] @classmethod def from_jd(cls, jd: pa.lib.DoubleArray, scale: str = "tai") -> Timestamp: return cls.from_mjd(pc.subtract(jd, 2400000.5), scale)
[docs] @classmethod def from_et(cls, et: pa.lib.DoubleArray, scale: str = "tdb") -> Timestamp: return cls.from_mjd(pc.divide(et, 86400), scale)
[docs] def fractional_days(self) -> pa.lib.DoubleArray: # IMPORTANT: pyarrow integer / integer division truncates, so ensure float divisor. return pc.divide(self.nanos, _NANOS_IN_DAY_F64)
[docs] def rounded(self, precision: str = "ns") -> Timestamp: if precision == "ns": return self elif precision == "us": nanos = pc.multiply(self.micros(), 1_000) elif precision == "ms": nanos = pc.multiply(self.millis(), 1_000_000) elif precision == "s": nanos = pc.multiply(self.seconds(), 1_000_000_000) else: raise ValueError(f"Unsupported precision: {precision}") return self.set_column("nanos", nanos)
[docs] def equals(self, other: Timestamp, precision: str = "ns") -> pa.BooleanArray: if self.scale != other.scale: raise ValueError("Cannot compare timestamps with different scales") if len(other) == 1: return self.equals_scalar(other.days[0], other.nanos[0], precision) else: return self.equals_array(other, precision)
[docs] def equals_scalar( self, days: int, nanos: int, precision: str = "ns" ) -> pa.BooleanArray: delta_days, delta_nanos = self.difference_scalar(days, nanos) if precision == "ns": max_deviation = 0 elif precision == "us": max_deviation = 999 elif precision == "ms": max_deviation = 999_999 elif precision == "s": max_deviation = 999_999_999 else: raise ValueError(f"Unsupported precision: {precision}") return _duration_arrays_within_tolerance(delta_days, delta_nanos, max_deviation)
[docs] def equals_array(self, other: Timestamp, precision: str = "ns") -> pa.BooleanArray: """ Compare two Timestamps, returning a BooleanArray indicating whether each element is equal. The Timestamps must have the same scale, and the same length. """ if self.scale != other.scale: raise ValueError("Cannot compare timestamps with different scales") if len(self) != len(other): raise ValueError("Timestamps must have the same length") delta_days, delta_nanos = self.difference(other) if precision == "ns": max_deviation = 0 elif precision == "us": max_deviation = 999 elif precision == "ms": max_deviation = 999_999 elif precision == "s": max_deviation = 999_999_999 else: raise ValueError(f"Unsupported precision: {precision}") return _duration_arrays_within_tolerance(delta_days, delta_nanos, max_deviation)
[docs] def max(self) -> Timestamp: """ Compute the maximum time. Returns ------- max_time : `~adam_core.time.Timestamp` The maximum time. If there are multiple maximum times, one of them is returned. """ # Compute the maximum day max_day = pc.max(self.days) days_mask = pc.equal(self.days, max_day) # Compute the maximum nanos for the maximum day max_nanos = pc.max(self.nanos.filter(days_mask)) nanos_mask = pc.equal(self.nanos, max_nanos) return self.apply_mask(pc.and_(days_mask, nanos_mask))[0]
[docs] def min(self) -> Timestamp: """ Compute the minimum time. Returns ------- min_time : `~adam_core.time.Timestamp` The minimum time. If there are multiple minimum times, one of them is returned. """ # Compute the minimum day min_day = pc.min(self.days) days_mask = pc.equal(self.days, min_day) # Compute the minimum nanos for the minimum day min_nanos = pc.min(self.nanos.filter(days_mask)) nanos_mask = pc.equal(self.nanos, min_nanos) return self.apply_mask(pc.and_(days_mask, nanos_mask))[0]
[docs] @classmethod def from_astropy(cls, astropy_time: astropy.time.Time) -> Timestamp: """Convert an astropy time to a quivr timestamp. This is a lossy conversion, since astropy uses floating point to represent times, while quivr uses integers. The astropy time must use a scale supported by quivr. The supported scales are "tai", "tt", "ut1", "utc", and "tdb". """ if astropy_time.scale not in SCALES: raise ValueError(f"Unsupported scale: {astropy_time.scale}") if astropy_time.isscalar: return cls._from_astropy_scalar(astropy_time) jd1, jd2 = astropy_time.jd1, astropy_time.jd2 days, remainder = divmod(jd1 - 2400000.5, 1) remainder += jd2 mask = remainder < 0 np.add(remainder, 1, where=mask, out=remainder) np.subtract(days, 1, where=mask, out=days) mask = remainder >= 1 np.subtract(remainder, 1, where=mask, out=remainder) np.add(days, 1, where=mask, out=days) nanos = np.round(remainder * 86400 * 1e9) # Floating point arithmetic can cause nanos to be 86400e9 # instead of 0, so we need to handle this case nanos_full_day = nanos == 86400 * 1e9 days = np.where(nanos_full_day, days + 1, days).astype(np.int64) nanos = np.where(nanos_full_day, 0, nanos) return cls.from_kwargs( scale=astropy_time.scale, days=days, nanos=nanos, )
@classmethod def _from_astropy_scalar(cls, astropy_time: astropy.time.Time) -> Timestamp: """Astropy times can be scalar-valued, which requires separate handling because the numpy array functions won't work. """ jd1, jd2 = astropy_time.jd1, astropy_time.jd2 days, remainder = divmod(jd1 - 2400000.5, 1) remainder += jd2 if remainder < 0: remainder += 1 days -= 1 if remainder >= 1: remainder -= 1 days += 1 nanos = round(remainder * 86400 * 1e9) return cls.from_kwargs( scale=astropy_time.scale, days=[int(days)], nanos=[nanos], )
[docs] def to_astropy(self) -> astropy.time.Time: """ Convert the timestamp to an astropy time. """ fractional_days = self.fractional_days() return astropy.time.Time( val=self.days, val2=fractional_days, format="mjd", scale=self.scale, )
[docs] def add_nanos( self, nanos: pa.lib.Int64Array | int, check_range: bool = True ) -> Timestamp: """ Add nanoseconds to the timestamp. Negative nanoseconds are allowed. Parameters ---------- nanos : The nanoseconds to add. Can be a scalar or an array of the same length as the timestamp. Must be in the range [-86400e9, 86400e9). check_range : If True, check that the nanoseconds are in the range [-86400e9, 86400e9). If False, the caller is responsible for ensuring that the nanoseconds are in the correct range. """ if check_range: if isinstance(nanos, int): if not -_NANOS_IN_DAY <= nanos < _NANOS_IN_DAY: raise ValueError("Nanoseconds out of range") else: if not pc.all( pc.and_( pc.greater_equal(nanos, -_NANOS_IN_DAY), pc.less(nanos, _NANOS_IN_DAY), ) ).as_py(): raise ValueError("Nanoseconds out of range") nanos = pc.add_checked(self.nanos, nanos) overflows = pc.greater_equal(nanos, _NANOS_IN_DAY) underflows = pc.less(nanos, 0) mask = pa.StructArray.from_arrays( [overflows, underflows], names=["overflows", "underflows"] ) nanos = pc.case_when( mask, pc.subtract(nanos, _NANOS_IN_DAY), pc.add(nanos, _NANOS_IN_DAY), nanos, ) days = pc.case_when( mask, pc.add(self.days, 1), pc.subtract(self.days, 1), self.days, ) v1 = self.set_column("days", days) v2 = v1.set_column("nanos", nanos) return v2
[docs] def add_seconds( self, seconds: pa.lib.Int64Array | int | pa.DoubleArray | float ) -> Timestamp: """ Add seconds to the timestamp. Negative seconds are supported. Parameters ---------- seconds : The seconds to add. Can be a scalar or an array of the same length as the timestamp. Must be in the range [-86400, 86400). See Also -------- add_nanos : Add nanoseconds to the timestamp. This method includes a 'check_range' parameter that allows the caller to disable range checking for performance reasons. """ nanos = pc.cast(pc.round(pc.multiply(seconds, 1_000_000_000)), pa.int64()) return self.add_nanos(nanos)
[docs] def add_millis(self, millis: pa.lib.Int64Array | int) -> Timestamp: """ Add milliseconds to the timestamp. Negative milliseconds are supported. Parameters ---------- millis : The milliseconds to add. Can be a scalar or an array of the same length as the timestamp. Must be in the range [-86400e3, 86400e3). See Also -------- add_nanos : Add nanoseconds to the timestamp. This method includes a 'check_range' parameter that allows the caller to disable range checking for performance reasons. """ nanos = pc.cast(pc.round(pc.multiply(millis, 1_000_000)), pa.int64()) return self.add_nanos(nanos)
[docs] def add_micros(self, micros: pa.lib.Int64Array | int) -> Timestamp: """ Add microseconds to the timestamp. Negative microseconds are supported. Parameters ---------- micros : The microseconds to add. Can be a scalar or an array of the same length as the timestamp. Must be in the range [-86400e6, 86400e6). See Also -------- add_nanos : Add nanoseconds to the timestamp. This method includes a 'check_range' parameter that allows the caller to disable range checking for performance reasons. """ nanos = pc.cast(pc.round(pc.multiply(micros, 1_000)), pa.int64()) return self.add_nanos(nanos)
[docs] def add_days(self, days: pa.lib.Int64Array | int) -> Timestamp: """Add days to the timestamp. Parameters ---------- days : The days to add. Can be a scalar or an array of the same length as the timestamp. Use negative values to subtract days. """ return self.set_column("days", pc.add(self.days, days))
[docs] def add_fractional_days( self, fractional_days: pa.lib.DoubleArray | float ) -> Timestamp: """ Add fractional days to the timestamp. Parameters ---------- fractional_days : The fractional days to add. Can be a scalar or an array of the same length as the timestamp. Use negative values to subtract fractional days. """ day_part = pc.floor(fractional_days) nano_part = pc.subtract(fractional_days, day_part) days = pc.cast(day_part, pa.int64()) nanos = pc.cast( pc.multiply(nano_part, _NANOS_IN_DAY), options=pc.CastOptions(target_type=pa.int64(), allow_float_truncate=True), ) return self.add_days(days).add_nanos(nanos)
[docs] def difference_scalar( self, days: int, nanos: int ) -> tuple[pa.Int64Array, pa.Int64Array]: """ Compute the difference between this timestamp and a scalar timestamp. The difference is computed as (self - scalar). The result is presented as a tuple of (days, nanos). The nanos value is always non-negative, in the range [0, 86400e9). Parameters ---------- days : The days of the scalar timestamp. nanos : The nanoseconds of the scalar timestamp. Returns ------- days : The difference in days. This value can be negative. nanos : The difference in nanoseconds. This value is always non-negative, in the range [0, 86400e9). Examples -------- >>> from adam_core.time import Timestamp >>> ts = Timestamp.from_kwargs(days=[0, 1, 2], nanos=[200, 0, 100]) >>> have_days, have_nanos = ts.difference_scalar(1, 100) >>> have_days.to_pylist() [-1, -1, 1] >>> have_nanos.to_pylist() [100, 86399999999900, 0] """ days1 = pc.subtract(self.days, days) nanos1 = pc.subtract(self.nanos, nanos) overflows = pc.greater_equal(nanos1, _NANOS_IN_DAY) underflows = pc.less(nanos1, 0) mask = pa.StructArray.from_arrays( [overflows, underflows], names=["overflows", "underflows"] ) nanos2 = pc.case_when( mask, pc.subtract(nanos1, _NANOS_IN_DAY), pc.add(nanos1, _NANOS_IN_DAY), nanos1, ) days2 = pc.case_when( mask, pc.add(days1, 1), pc.subtract(days1, 1), days1, ) return days2, nanos2
[docs] def difference(self, other: Timestamp) -> tuple[pa.Int64Array, pa.Int64Array]: """ Compute the element-wise difference between this timestamp and another. """ if self.scale != other.scale: raise ValueError( "Cannot compute difference between timestamps with different scales" ) days1 = pc.subtract(self.days, other.days) nanos1 = pc.subtract(self.nanos, other.nanos) overflows = pc.greater_equal(nanos1, _NANOS_IN_DAY) underflows = pc.less(nanos1, 0) mask = pa.StructArray.from_arrays( [overflows, underflows], names=["overflows", "underflows"] ) nanos2 = pc.case_when( mask, pc.subtract(nanos1, _NANOS_IN_DAY), pc.add(nanos1, _NANOS_IN_DAY), nanos1, ) days2 = pc.case_when( mask, pc.add(days1, 1), pc.subtract(days1, 1), days1, ) return days2, nanos2
[docs] def unique(self) -> Timestamp: """Return a new Timestamp table containing only the unique elements from self. Order is not necessarily preserved. """ uniqued = self.table.group_by(["days", "nanos"]).aggregate([]) uniqued = uniqued.replace_schema_metadata(self.table.schema.metadata) return Timestamp.from_pyarrow(uniqued)
[docs] def rescale_astropy(self, new_scale: str) -> Timestamp: if self.scale == new_scale: return self elif new_scale == "tai": return Timestamp.from_astropy(self.to_astropy().tai) elif new_scale == "utc": return Timestamp.from_astropy(self.to_astropy().utc) elif new_scale == "tt": return Timestamp.from_astropy(self.to_astropy().tt) elif new_scale == "ut1": return Timestamp.from_astropy(self.to_astropy().ut1) elif new_scale == "tdb": return Timestamp.from_astropy(self.to_astropy().tdb) else: raise ValueError("Unknown scale: {}".format(new_scale))
def _tt_tdb_correction( self, positive: bool, correction: np.ndarray | int ) -> np.ndarray: """Compute correction from TT to TDB in nanoseconds. Correction in nanoseconds is used to bring the self value to TT from wherever it is. Otherwise we can get a few nanoseconds difference in roundtrip. If positive is False, add a minus sign to get TDB to TT correction. Math from gssc.esa.int/navipedia/index.php/Transformations_between_Time_Systems Note that astropy/ERFA use a different algorithm that is based on the location on Earth, but also seems to use a few approximations. As a result the difference can be in some tens of microseconds. """ # Going into numpy and then back is considerably faster than doing the # same calculation using pyarrow. The result is the same if delta is # rounded before astype. As written, delta is truncated, which may produce # a 1ns difference. days = self.days.to_numpy() - 51_545 fracs = (self.nanos.to_numpy() + correction) / _NANOS_IN_DAY + 0.5 centuries = (days + fracs) / 36_525 g = np.radians(35999.050 * centuries + 357.528) delta = np.sin(g + 0.0167 * np.sin(g)) * 1658000 if not positive: delta = -delta return delta.astype(np.int64) def _leap_seconds_correction( self, converter: _ErfaDayFracConverter, correction: np.ndarray | int ) -> np.ndarray: """Compute leap seconds correction, returned in nanoseconds. Correction in nanoseconds is used to bring self from wherever it is to TAI. Converter is an ERFA method used to get the actual leap seconds. An alternative would be to download and lookup the leap seconds tables ourselves, which is left as an exercise for the future. """ # ERFA methods take and return a pair of ndarrays for the full days and fractions of JD old_days = self.days.to_numpy() + 2400000 old_frac = (self.nanos.to_numpy() + correction) / _NANOS_IN_DAY + 0.5 new_days, new_frac = converter(old_days, old_frac) delta = ((new_days - old_days) + (new_frac - old_frac)) * _NANOS_IN_DAY # Want nice round seconds, but in nanoseconds. Without rounding we occasionally # get a stray nanosecond, which messes up roundtrip conversions. delta = np.round(delta * 1e-9).astype(np.int64) * 1_000_000_000 return delta
[docs] def rescale(self, new_scale: str) -> Timestamp: """ Convert this `Timestamp` to `new_scale`. Notes ----- - For `ut1` conversions, we delegate to `rescale_astropy()` because UT1 requires IERS tables and interpolation that astropy manages. - For conversions involving TDB, this implementation uses a different (non-Earth-location- dependent) approximation than astropy/ERFA, and differences of 10-30 microseconds versus astropy can occur. Round-trip conversions within this implementation are designed to be stable (see tests). See also -------- `src/adam_core/time/tests/test_time.py`: correctness + benchmark tests for rescale. """ if self.scale == new_scale: return self # ut1 requires delta interpolated from tables published by the IERS. # ERFA functions expect the delta to be provided. Astropy does the # file downloading and interpolating, so stick with it here. if self.scale == "ut1" or new_scale == "ut1": return self.rescale_astropy(new_scale) # Delta in nanoseconds correction = None if self.scale == "tt": if new_scale == "tai": correction = -_TAI_TT_NS_CORRECTION elif new_scale == "utc": correction = ( self._leap_seconds_correction(erfa.taiutc, 0) - _TAI_TT_NS_CORRECTION ) elif new_scale == "tdb": correction = self._tt_tdb_correction(True, 0) elif self.scale == "utc": correction = self._leap_seconds_correction(erfa.utctai, 0) if new_scale == "tt": correction += _TAI_TT_NS_CORRECTION elif new_scale == "tdb": # We are effectively going UTC->TAI->TT->TDB with corrections here correction += ( self._tt_tdb_correction(True, correction + _TAI_TT_NS_CORRECTION) + _TAI_TT_NS_CORRECTION ) elif self.scale == "tai": if new_scale == "tt": correction = _TAI_TT_NS_CORRECTION elif new_scale == "tdb": correction = ( self._tt_tdb_correction(True, _TAI_TT_NS_CORRECTION) + _TAI_TT_NS_CORRECTION ) elif new_scale == "tai": correction = 0 elif new_scale == "utc": correction = self._leap_seconds_correction(erfa.taiutc, 0) elif self.scale == "tdb": if new_scale == "tt": correction = self._tt_tdb_correction(False, 0) elif new_scale == "tai": correction = self._tt_tdb_correction(False, 0) - _TAI_TT_NS_CORRECTION elif new_scale == "utc": correction = self._tt_tdb_correction(False, 0) - _TAI_TT_NS_CORRECTION correction += self._leap_seconds_correction(erfa.taiutc, correction) if correction is not None: tmp = self.add_nanos(correction, check_range=False) return Timestamp.from_kwargs( days=tmp.days, nanos=tmp.nanos, scale=new_scale ) raise ValueError( "Rescale from {} to {} is not supported".format(self.scale, new_scale) )
def _duration_arrays_within_tolerance( delta_days: pa.Int64Array, delta_nanos: pa.Int64Array, max_nanos_deviation: int ) -> pa.BooleanArray: """Return a boolean array indicating whether the delta_days and delta_nanos arrays are within the specified tolerance. The max_nanos_deviation should be the maximum number of nanoseconds that the the two arrays can deviate to still be considered 'within tolerance'. """ if max_nanos_deviation == 0: return pc.and_(pc.equal(delta_days, 0), pc.equal(delta_nanos, 0)) cond1 = pc.and_( pc.equal(delta_days, 0), pc.less(pc.abs(delta_nanos), max_nanos_deviation) ) cond2 = pc.and_( pc.equal(delta_days, -1), pc.greater_equal(pc.abs(delta_nanos), 86400 * 1e9 - max_nanos_deviation), ) return pc.or_(cond1, cond2)