Source code for padre_meddea.housekeeping.calibration
"""Provides functions to calibrate housekeeping data"""
import astropy.units as u
import numpy as np
from astropy.io import ascii
from astropy.table import QTable, Table
from astropy.timeseries import TimeSeries
from scipy.optimize import fsolve
from .housekeeping import _data_directory
calibration_table = ascii.read(_data_directory / "padre_meddea_calib_hk_0.csv")
calibration_table.add_index("name")
[docs]
def get_calibration_func(hk_name: str) -> callable:
"""Given a housekeeping name, return a calibration function to convert from uncalibrated measurements to physical values.
Parameters
----------
hk_name: str
Returns
-------
f: callable that returns physical quantities
"""
f = _get_calibration_func(hk_name)
unit_str = calibration_table.loc[hk_name]["unit_str"]
def g(x):
return u.Quantity(f(x), unit_str)
return g
[docs]
def _get_calibration_func(hk_name: str):
"""Given a housekeeping name, return the calibration function to convert from uncalibrated data values to physical values."""
if hk_name in list(calibration_table["name"]): # is it a housekeeping
row = calibration_table.loc[hk_name]
match row["cal_func"]:
case "poly":
params = [float(p) for p in row["cal_params"].split(",")]
f = np.poly1d(params)
case "log":
params = [float(p) for p in row["cal_params"].split(",")]
def f(x):
return params[0] * np.log(x) + params[1]
case "bspline":
f = fit_bspline(hk_name)
else:
raise ValueError(f"The housekeeping name, {hk_name}, is not recognized.")
return f
[docs]
def fit_bspline(hk_name: str) -> callable:
"""
Create a B-spline interpolation function for converting ADC values to physical values.
This function retrieves calibration data for a specified housekeeping parameter
and fits a B-spline interpolation to the ADC-to-value mapping. The returned
function can be used to convert raw ADC readings to calibrated physical values
with appropriate units.
Parameters
----------
hk_name : str
Name of the housekeeping parameter for which to create the calibration function.
Returns
-------
callable
A function that takes ADC values as input and returns the corresponding
calibrated physical values as a `u.Quantity` object with appropriate units.
Notes
-----
The returned function handles unit conversion automatically based on the units
of the calibration data.
"""
from scipy.interpolate import make_interp_spline
data = get_calibration_data(hk_name)
ind = np.argsort(data["adc"])
f = make_interp_spline(data["adc"][ind], data["value"][ind].value)
def qf(x):
return u.Quantity(f(x), data["value"].unit)
return qf
[docs]
def inverse_calibrate(hk_name: str, x: u.Quantity) -> float:
"""Given a housekeeping name, return the inverse function to convert from physical values to
uncalibrated data values.
Parameters
----------
hk_name
The housekeeping name
x
The physical value
Return
------
adc
The ADC value that converts into the given physical value.
"""
f = get_calibration_func(hk_name)
def g(adc):
return (f(adc) - x).value
result = fsolve(g, 0.0)
print(f"error = {x - get_calibration_func(hk_name)(result)}")
return result
[docs]
def calibrate_hk_ts(ts: TimeSeries) -> TimeSeries:
"""Given a housekeeping timeseries, calibrate each column.
Replaces the values in the columns with calibrated values.
Columns with names that are not recognized are ignored.
New
Parameters
----------
housekeeping_ts
A timeseries or Table of uncalibrated housekeeping measurements
Returns
-------
calibrated_ts
"""
cal_ts = ts.copy()
for this_col in ts.colnames:
try:
f = get_calibration_func(this_col)
cal_ts[this_col] = f(ts[this_col])
except ValueError: # ignore columns whose names we do not recognize
pass
except KeyError:
pass
return cal_ts
[docs]
def get_calibration_data(hk_name: str) -> Table:
"""
Retrieve calibration data for a specific housekeeping parameter.
This function reads calibration data from a CSV file named after the housekeeping
parameter and returns it as an astropy QTable with ADC values and corresponding
physical values with appropriate units.
Parameters
----------
hk_name : str
Name of the housekeeping parameter for which to retrieve calibration data.
This will be used to find the corresponding CSV file.
Returns
-------
astropy.table.Table
A table containing the calibration data with columns:
- 'adc': ADC values (raw digital counts)
- 'value': Physical values with appropriate units
Notes
-----
The calibration CSV file is expected to have at least two columns:
one named 'adc' and another column whose name represents the physical unit.
"""
calibration_data_directory = _data_directory / "calibration"
data = ascii.read(calibration_data_directory / f"{hk_name}.csv")
unit_str = data.colnames[-1]
result = QTable()
result["adc"] = data["adc"]
result["value"] = u.Quantity(data[unit_str], unit_str)
return result
[docs]
def parse_error_summary(hk_ts: TimeSeries) -> TimeSeries:
"""Given a time series with an error summary column, parse it out to its individual elements.
Parameters
----------
hk_ts : TimeSeries
Returns
-------
error_summary_ts : TimeSeries
"""
if "error_summary" not in hk_ts.colnames:
raise ValueError("Missing error_summary column.")
error_ts = TimeSeries(
time=hk_ts.time, data={"error_summary": hk_ts["error_summary"]}
)
col_names = [
"uart_parity_err",
"ping1_parity_err",
"ping2_parity_err",
"hist_parity_err",
"heater_err",
"caliste_reg_err",
"caliste_seu",
"rogue_pps",
"fake_evt_flg",
"freewheel_pps",
]
for i, this_col in enumerate(col_names):
error_ts[this_col] = (error_ts["error_summary"] & 2**i) > 0
return error_ts