import logging
from dataclasses import asdict, dataclass
from typing import Optional, Tuple
import numpy as np
import pyarrow.compute as pc
import quivr as qv
from astropy.time import Time
from ..time import Timestamp
STRING100 = 100
STRING25 = 25
logger = logging.getLogger(__name__)
[docs]
@dataclass
class ObservatoryObsContext:
mpcCode: str
name: Optional[str] = None
def __post_init__(self):
assert len(self.mpcCode) in [3, 4]
if self.name is not None:
assert len(self.name) <= STRING100
[docs]
@dataclass
class SubmitterObsContext:
name: str
institution: Optional[str] = None
def __post_init__(self):
assert len(self.name) <= STRING100
if self.institution is not None:
assert len(self.institution) <= STRING100
[docs]
@dataclass
class TelescopeObsContext:
design: str
aperture: float
detector: str
name: Optional[str] = None
fRatio: Optional[float] = None
filter: Optional[str] = None
arraySize: Optional[str] = None
pixelSize: Optional[float] = None
def __post_init__(self):
if self.name is not None:
assert len(self.name) <= STRING100
assert len(self.design) <= STRING25
assert self.aperture > 0
assert len(self.detector) <= STRING25
if self.fRatio is not None:
assert self.fRatio > 0
if self.filter is not None:
assert len(self.filter) <= STRING25
if self.arraySize is not None:
assert len(self.arraySize) <= STRING25
if self.pixelSize is not None:
assert self.pixelSize > 0
[docs]
@dataclass
class SoftwareObsContext:
astrometry: Optional[str] = None
fitOrder: Optional[str] = None
photometry: Optional[str] = None
objectDetection: Optional[str] = None
def __post_init__(self):
if self.astrometry is not None:
assert len(self.astrometry) <= STRING100
if self.fitOrder is not None:
assert len(self.fitOrder) <= STRING25
if self.photometry is not None:
assert len(self.photometry) <= STRING100
if self.objectDetection is not None:
assert len(self.objectDetection) <= STRING100
[docs]
@dataclass
class ObsContext:
observatory: ObservatoryObsContext
submitter: SubmitterObsContext
observers: list[str]
measurers: list[str]
telescope: TelescopeObsContext
software: Optional[SoftwareObsContext] = None
coinvestigators: Optional[list[str]] = None
collaborators: Optional[list[str]] = None
fundingSource: Optional[str] = None
comments: Optional[list[str]] = None
def __post_init__(self):
assert len(self.observers) > 0
for observer in self.observers:
assert len(observer) <= STRING100
assert len(self.measurers) > 0
for measurer in self.measurers:
assert len(measurer) <= STRING100
if self.coinvestigators is not None:
assert len(self.coinvestigators) > 0
for coinvestigator in self.coinvestigators:
assert len(coinvestigator) <= STRING100
if self.collaborators is not None:
assert len(self.collaborators) > 0
for collaborator in self.collaborators:
assert len(collaborator) <= STRING100
if self.fundingSource is not None:
assert len(self.fundingSource) <= STRING100
if self.comments is not None:
for comment in self.comments:
assert len(comment) <= STRING100
[docs]
def to_string(self) -> str:
lines = []
for k, v in asdict(self).items():
if isinstance(v, dict):
lines.append(f"# {k}")
for k2, v2 in v.items():
if v2 is not None:
lines.append(f"! {k2} {v2}")
else:
if v is not None:
if k in [
"observers",
"measurers",
"coinvestigators",
"collaborators",
]:
lines.append(f"# {k}")
for name in v:
lines.append(f"! name {name}")
elif k == "fundingSource":
lines.append(f"# fundingSource {v}")
elif k == "comments":
if len(v) > 0:
lines.append("# comment")
for comment in v:
lines.append(f"! line {comment}")
return "\n".join(lines) + "\n"
[docs]
class ADESObservations(qv.Table):
permID = qv.LargeStringColumn(nullable=True)
provID = qv.LargeStringColumn(nullable=True)
trkSub = qv.LargeStringColumn(nullable=True)
obsSubID = qv.LargeStringColumn(nullable=True)
obsTime = Timestamp.as_column()
rmsTime = qv.Float64Column(nullable=True)
ra = qv.Float64Column()
dec = qv.Float64Column()
# ADES uses arcseconds for rmsRA and rmsDec
# rmsRA is also multiplied by cos(dec)
rmsRACosDec = qv.Float64Column(
nullable=True,
validator=qv.validators.and_(qv.validators.ge(10e-8), qv.validators.lt(1e2)),
)
rmsDec = qv.Float64Column(
nullable=True,
validator=qv.validators.and_(qv.validators.ge(10e-8), qv.validators.lt(1e2)),
)
rmsCorr = qv.Float64Column(nullable=True)
mag = qv.Float64Column(nullable=True)
rmsMag = qv.Float64Column(nullable=True)
band = qv.LargeStringColumn(nullable=True)
stn = qv.LargeStringColumn()
mode = qv.LargeStringColumn()
astCat = qv.LargeStringColumn()
photCat = qv.LargeStringColumn(nullable=True)
logSNR = qv.Float64Column(nullable=True)
seeing = qv.Float64Column(nullable=True)
exp = qv.Float64Column(nullable=True)
remarks = qv.LargeStringColumn(nullable=True)
[docs]
def ADES_to_string(
observations: ADESObservations,
obs_contexts: dict[str, ObsContext],
seconds_precision: int = 3,
columns_precision: dict[str, int] = {
"ra": 9,
"dec": 9,
"rmsRACosDec": 5,
"rmsDec": 5,
"rmsCorr": 8,
"mag": 4,
"rmsMag": 4,
"exp": 2,
"logSNR": 2,
"seeing": 2,
},
) -> str:
"""
Write ADES observations to a string.
Parameters
----------
observations : ADESObservations
The observations to write to a string.
obs_contexts : dict[str, ObsContext]
A dictionary of observatory codes and their corresponding ObsContexts to use
as the context headers for the different observatory codes in the observations.
seconds_precision : int, optional
The precision to use for the seconds in the obsTime field, by default 3.
columns_precision : dict[str, int], optional
A dictionary of column names and their corresponding precision to use when writing
the observations to the file, by default {
"ra": 8,
"dec": 8,
"rmsRACosDec": 4,
"rmsDec": 4,
"mag": 2,
"rmsMag": 2,
}
The MPC enforces strict limits on these and submitters may need permission to send
high-precision data.
Returns
-------
ades_string : str
The ADES observations as a string.
"""
ades_string = "# version=2022\n"
unique_observatories = observations.stn.unique().to_numpy(zero_copy_only=False)
unique_observatories.sort()
for obs in unique_observatories:
if obs not in obs_contexts:
raise ValueError(f"Observatory {obs} not found in obs_contexts")
observations_obscode = observations.select("stn", obs)
observations_obscode = observations_obscode.sort_by(
[
("provID", "ascending"),
("permID", "ascending"),
("trkSub", "ascending"),
("obsTime.days", "ascending"),
("obsTime.nanos", "ascending"),
]
)
id_present = False
if not pc.all(pc.is_null(observations_obscode.permID)).as_py():
id_present = True
if not pc.all(pc.is_null(observations_obscode.provID)).as_py():
id_present = True
if not pc.all(pc.is_null(observations_obscode.trkSub)).as_py():
id_present = True
if not id_present:
err = (
"At least one of permID, provID, or trkSub should\n"
"be present in observations."
)
raise ValueError(err)
# Write the observatory context block
obs_context = obs_contexts[obs]
ades_string += obs_context.to_string()
# Write the observations block (we first convert
# to a pandas dataframe)
ades = observations_obscode.to_dataframe()
# Convert the timestamp to ISOT with the desired precision
observation_times = Time(
observations_obscode.obsTime.rescale("utc")
.mjd()
.to_numpy(zero_copy_only=False),
format="mjd",
precision=seconds_precision,
)
ades.insert(
4,
"obsTime",
np.array([i + "Z" for i in observation_times.utc.isot]),
)
ades.drop(columns=["obsTime.days", "obsTime.nanos"], inplace=True)
ades.dropna(how="all", axis=1, inplace=True)
# Change the precision of some of the columns to conform
# to MPC standards
for col, prec_col in columns_precision.items():
if col in ades.columns:
ades[col] = [
f"{i:.{prec_col}f}" if i is not None or not np.isnan(i) else ""
for i in ades[col]
]
# Rename the columns to match the ADES format
ades.rename(columns={"rmsRACosDec": "rmsRA"}, inplace=True)
ades_string += ades.to_csv(
sep="|", header=True, index=False, float_format="%.16f"
)
return ades_string
def _data_dict_to_table(data_dict: dict[str, list[str]]) -> ADESObservations:
if not data_dict:
return ADESObservations.empty()
# Get the set of known columns from ADESObservations
known_columns = set(ADESObservations.empty().table.column_names)
# Check for unknown columns
unknown_columns = set(data_dict.keys()) - known_columns
if unknown_columns:
logger.warning(
f"Found unknown ADES columns that will be ignored: {unknown_columns}"
)
# Convert every value that is empty string or whitespace to None
for col in data_dict:
data_dict[col] = [None if x == "" or x.isspace() else x for x in data_dict[col]]
numeric_cols = [
"ra",
"dec",
"rmsRA",
"rmsDec",
"mag",
"rmsMag",
"rmsCorr",
"rmsTime",
"logSNR",
"seeing",
"exp",
]
# Do all the data conversions and then initialize the new table and concatenate
for col in numeric_cols:
if col in data_dict:
data_dict[col] = [
float(x) if x is not None else None for x in data_dict[col]
]
# Some users are accustomed to having fixed-width columns, so we strip whitespace
# from all the string columns with the exception of the 'remarks' column
string_cols = [
"provID",
"permID",
"trkSub",
"obsSubID",
"stn",
"mode",
"astCat",
"photCat",
"band",
]
for col in string_cols:
if col in data_dict:
data_dict[col] = [
x.strip() if x is not None else None for x in data_dict[col]
]
if "obsTime" in data_dict:
# Remove 'Z' from timestamps and convert to MJD
times = [t[:-1] if t is not None else None for t in data_dict["obsTime"]]
data_dict["obsTime"] = Timestamp.from_iso8601(times, scale="utc")
# Update the rmsRACosDec column name to be simply rmsRA
if "rmsRA" in data_dict:
data_dict["rmsRACosDec"] = data_dict["rmsRA"]
data_dict.pop("rmsRA")
# Only keep keys that are in ADESObservations
data_dict = {k: v for k, v in data_dict.items() if k in known_columns}
return ADESObservations.from_kwargs(**data_dict)
[docs]
def ADES_string_to_tables(
ades_string: str,
) -> Tuple[dict[str, ObsContext], ADESObservations]:
"""
Parse an ADES format string into ObsContext and ADESObservations objects.
Parameters
----------
ades_string : str
The ADES format string to parse.
Returns
-------
tuple[dict[str, ObsContext], ADESObservations]
A tuple containing:
- A dictionary mapping observatory codes to their ObsContext objects
- An ADESObservations table containing the observations
"""
# Split the string into lines and remove empty lines
lines = [line.strip() for line in ades_string.split("\n") if line.strip()]
# Start by parsing the data lines
current_data = {}
observations = ADESObservations.empty()
for line in lines:
# Skip over metadata lines
if line.startswith("#") or line.startswith("!"):
continue
# Detect if it's a header line by looking for permID, provID, or trkSub
if "permID" in line or "provID" in line or "trkSub" in line:
observations = qv.concatenate(
[observations, _data_dict_to_table(current_data)]
)
new_headers = [header.strip() for header in line.split("|")]
current_data = {header: [] for header in new_headers}
continue
# Add the data line to the current data dictionary
data_line = line.split("|")
for header, value in zip(current_data.keys(), data_line):
current_data[header].append(value)
# Add the last data dictionary to the observations table
observations = qv.concatenate([observations, _data_dict_to_table(current_data)])
# Now we parse the metadata sections
# Initialize variables
obs_contexts = {}
current_obs_context = {}
current_section_key = None
current_context_code = None
for line in lines:
if line.startswith("#"):
line = line[1:].strip()
if line.startswith("version"):
continue
current_section_key, *value = [x.strip() for x in line.split(" ")]
if current_section_key == "observatory":
# Only build obs context if current_obs_context is not empty
if current_obs_context:
obs_contexts[current_context_code] = _build_obs_context(
current_obs_context
)
current_obs_context = {}
current_context_code = None
# Some sections specify the value in the same line as the section key
if len(value) > 0:
current_obs_context[current_section_key] = " ".join(value)
continue
if line.startswith("!"):
line = line[1:].strip()
key, *value = [x.strip() for x in line.split(" ")]
value = " ".join(value)
if key == "mpcCode":
current_context_code = value
current_section = current_obs_context.setdefault(current_section_key, {})
if current_section_key in ["observers", "measurers", "comment"]:
current_key_values = current_section.setdefault(key, [])
current_key_values.append(value)
else:
current_section[key] = value
if current_obs_context:
obs_contexts[current_context_code] = _build_obs_context(current_obs_context)
return obs_contexts, observations
def _build_obs_context(context_dict: dict) -> ObsContext:
"""Helper function to build an ObsContext from parsed dictionary data."""
# Extract observatory data
observatory = ObservatoryObsContext(
mpcCode=context_dict["observatory"]["mpcCode"],
name=context_dict["observatory"].get("name"),
)
# Extract submitter data
submitter = SubmitterObsContext(
name=context_dict["submitter"]["name"],
institution=context_dict["submitter"].get("institution"),
)
# Extract telescope data
telescope_data = context_dict.get("telescope", {})
telescope = TelescopeObsContext(
name=telescope_data.get("name", None),
design=telescope_data["design"],
aperture=float(telescope_data["aperture"]),
detector=telescope_data.get("detector"),
fRatio=float(telescope_data["fRatio"]) if "fRatio" in telescope_data else None,
filter=telescope_data.get("filter"),
arraySize=telescope_data.get("arraySize"),
pixelSize=(
float(telescope_data["pixelSize"])
if "pixelSize" in telescope_data
else None
),
)
# Extract software data
software_data = context_dict.get("software", {})
software = None
if software_data:
software = SoftwareObsContext(
astrometry=software_data.get("astrometry"),
fitOrder=software_data.get("fitOrder"),
photometry=software_data.get("photometry"),
objectDetection=software_data.get("objectDetection"),
)
# Build ObsContext
return ObsContext(
observatory=observatory,
submitter=submitter,
observers=context_dict["observers"]["name"],
measurers=context_dict["measurers"]["name"],
telescope=telescope,
software=software,
fundingSource=context_dict.get("fundingSource"),
comments=context_dict.get("comment", {}).get("line", []),
)