Source code for padre_meddea.spectrum.calibration

"""Tools to analyze and calibrate spectral data"""

from pathlib import Path
from typing import Optional

import astropy.units as u
import matplotlib.pyplot as plt
import numpy as np
import specutils
from astropy.time import Time
from astropy.timeseries import TimeSeries
from specutils import SpectralRegion, Spectrum1D
from specutils.manipulation import extract_region

import padre_meddea.util.util as util
from padre_meddea.util.pixels import PixelList
from padre_meddea import _data_directory
from padre_meddea.spectrum.spectrum import PhotonList, SpectrumList

specutils.conf.do_continuum_function_check = False

BA_LINE_ENERGIES = [7.8, 11.8, 30.85, 35, 53.5, 57.8, 81] * u.keV


[docs] def get_calfunc_barium_rough(spec: Spectrum1D, plot: bool = False): """ Given a full range Ba-133 spectrum, return a rough linear calibration function by finding and fitting only the two strongest lines (30.85 keV, 81 keV). It does this by splitting the spectrum into two regions and finding the maximum value in those regions. The saturated values at high energies are ignored. Parameters ---------- spec : Spectrum1D The Ba-133 spectrum plot : bool If True, then display a plot of the spectrum with the line peaks found. Returns ------- np.poly1d linear fit """ # the two strongest lines in the spectrum 30.85, 81 line_energies = u.Quantity([BA_LINE_ENERGIES[2], BA_LINE_ENERGIES[-1]]) line_centers = np.zeros(len(line_energies)) # split the spectrum into two regions, ignore the top end which includes saturated events. region_edges_percent = [0.17, 0.48, 0.73] region_edges_spec = np.floor(region_edges_percent * spec.spectral_axis.max()) spec_regions = SpectralRegion( [ [region_edges_spec[0], region_edges_spec[1]], [region_edges_spec[1], region_edges_spec[2]], ] ) for i, (this_energy, this_region) in enumerate(zip(line_energies, spec_regions)): sub_spec = extract_region(spec, this_region) mind = np.argmax(sub_spec.data) line_centers[i] = sub_spec.spectral_axis[mind].value # fit rough calibration using just these two lines p = np.polyfit(line_energies.value, line_centers, 1) f = np.poly1d(p) if plot: plt.plot(spec.spectral_axis, spec.flux) for this_line in line_centers: plt.axvline(this_line) return f
[docs] def fit_peak_parabola(spec: Spectrum1D) -> float: """Given a spectral region with a single line, fit a parabola to the peak and return the position of the maximum Parameters ---------- spec : Spectrum1D Returns ------- peak_center : float """ x = spec.spectral_axis.value y = spec.flux.value max_ind = np.argmax(y) # TODO add edge case for max at index value 0 or max index fit_x = [x[max_ind - 1], x[max_ind], x[max_ind + 1]] fit_y = [y[max_ind - 1], y[max_ind], y[max_ind + 1]] p = np.polyfit(fit_x, fit_y, 2) fit_peak = -p[1] / (2.0 * p[0]) return fit_peak
[docs] def fit_peaks( spec: Spectrum1D, line_centers: u.Quantity, plot: bool = False, fit_func="parabola", window=30, ) -> u.Quantity: """ Given a spectrum and a set of approximate peak or line centers, perform a fit for each and return the fitted peak location for each. Parameters ---------- spec : Spectrum1D The input spectrum line_centers : u.Quantity The approximate location of the line peaks plot : bool If True, then plot the data and fit for each line center region. fit_func : str The fit function for finding the peak value. window : int Number of points to consider around the line center Returns ------- fit_centers """ fit_centers = line_centers.copy() spec_units = line_centers[0].unit fit_window_halfwidth = window * spec_units for i, this_line in enumerate(line_centers): line_region = SpectralRegion( this_line - fit_window_halfwidth, this_line + fit_window_halfwidth ) sub_spec = extract_region(spec, line_region) fit_centers[i] = fit_peak_parabola(sub_spec) * spec_units if plot: plt.figure() plt.plot(sub_spec.spectral_axis.value, sub_spec.flux, label="data") plt.axvline(this_line.value, color="green", label="line_center") plt.axvline(fit_centers[i].value, color="red", label="fit peak") plt.legend() plt.show() return fit_centers
[docs] def calibrate_phlist_barium_linear(ph_list: PhotonList, plot: bool = False): """Given a PhotonList of a Ba-133 spectrum, perform a linear energy calibration for all detectors and pixels. Parameters ---------- ph_list: PhotonList Returns ------- lin_cal_params[num_asics,num_pixels,2] An array of linear calibration values for each pixel. """ spec_bins = np.arange(0, 4097, 8, dtype=np.uint16) * u.pix lin_cal_params = np.zeros((4, 12, 2)) all_pixels = PixelList.all() for i, this_pixel in enumerate(all_pixels): # fitting barium lines this_spec = ph_list.spectrum(pixel_list=this_pixel, bins=spec_bins) f = get_calfunc_barium_rough(this_spec) ba_line_centers = f(BA_LINE_ENERGIES.value) fit_line_centers = fit_peaks( this_spec, u.Quantity(ba_line_centers, this_spec.spectral_axis.unit) ) if plot: plt.figure() plt.plot(this_spec.spectral_axis.value, this_spec.flux.value) for this_line, that_line in zip(fit_line_centers, ba_line_centers): plt.axvline(this_line, color="red", label="fit") plt.axvline(that_line, color="green", label="rough") plt.title(f"{this_spec['label'].value}") plt.show() # if this_pixel > 8: # small pixel, remove the weak escape lines # x = [fit_line_centers[0], fit_line_centers[1], fit_line_centers[-1]] # y = [line_energies[0].value, line_energies[1].value, line_energies[-1].value] # else: x = fit_line_centers y = BA_LINE_ENERGIES.value p = np.polyfit(x, y, 1) f = np.poly1d(p) if plot: plt.figure() plt.plot(x, y, "x") plt.plot(x, f(x.value)) plt.title(f"{this_spec['label'].value}") plt.show() lin_cal_params[this_pixel["asic"], this_pixel["pixel"], :] = p return lin_cal_params
[docs] def calibrate_speclist_barium_linear(spec_list: SpectrumList, plot: bool = False): """Given a PhotonList of a Ba-133 spectrum, perform a linear energy calibration for all detectors and pixels. Parameters ---------- ph_list: PhotonList Returns ------- lin_cal_params[num_asics,num_pixels,2] An array of linear calibration values for each pixel. """ lin_cal_params = np.zeros((24, 2)) for pixel_index, this_pixel in enumerate(spec_list.pixel_list): # fitting barium lines this_spec = spec_list.spectrum(pixel_list=this_pixel) f = get_calfunc_barium_rough(this_spec) # TODO: test lines for flux STRONG_BA_LINE_ENERGIES = [7.8, 30.85, 35, 81] * u.keV ba_line_centers = f(STRONG_BA_LINE_ENERGIES.value) fit_line_centers = fit_peaks( this_spec, u.Quantity(ba_line_centers, this_spec.spectral_axis.unit), window=5, ) if plot: plt.figure() plt.plot(this_spec.spectral_axis.value, this_spec.flux.value) for this_line, that_line in zip(fit_line_centers, ba_line_centers): plt.axvline(this_line, color="red", label="fit") plt.axvline(that_line, color="green", label="rough") plt.title(this_pixel["label"]) plt.legend() plt.show() # if this_pixel > 8: # small pixel, remove the weak escape lines # x = [fit_line_centers[0], fit_line_centers[1], fit_line_centers[-1]] # y = [line_energies[0].value, line_energies[1].value, line_energies[-1].value] # else: x = fit_line_centers y = STRONG_BA_LINE_ENERGIES.value p = np.polyfit(x, y, 1) f = np.poly1d(p) if plot: plt.figure() plt.plot(x, y, "x") plt.plot(x, f(x.value)) plt.show() lin_cal_params[pixel_index, :] = p return lin_cal_params
[docs] def calibrate_linear_phlist( ph_list: PhotonList, lin_cal_params: np.array ) -> PhotonList: """Given an uncalibrated PhotonList and a complete set of linear calibration parameters produced by calibrate_phlist_barium_linear, apply the calibration to the PhotonList. Adds a new energy column. Paramters --------- Uncalibrated PhotonList Linear calibration parameter array Returns ------- calibrated PhotonList """ ph_list.event_list["energy"] = np.zeros(len(ph_list.event_list["atod"])) for this_asic in range(4): for this_pixel in range(12): ind = (ph_list.event_list["asic"] == this_asic) * ( ph_list.event_list["pixel"] == this_pixel ) cal_func = np.poly1d(lin_cal_params[this_asic, this_pixel, :]) ph_list.event_list["energy"][ind] = cal_func( ph_list.event_list["atod"][ind] ) return ph_list
[docs] def calibrate_linear_speclist( spec_list: SpectrumList, lin_cal_params: np.ndarray, spectral_axis: Optional[np.ndarray] = None, ) -> SpectrumList: """Given an uncalibrated SpectrumList and a complete set of linear calibration parameters produced by calibrate_phlist_barium_linear, apply the calibration to the PhotonList. Adds a new energy column. Parameters --------- spec_list: SpectrumList Uncalibrated SpectrumList lin_cal_params: np.ndarray Linear calibration parameter array spectral_axis: np.ndarray The new energy axis to interpolate the spectra onto. Returns ------- calibrated SpectrumList """ from scipy.interpolate import RectBivariateSpline if spectral_axis is None: spectral_axis = np.arange(0, 100, 0.1) * u.keV num_ts = spec_list.specs.shape[0] new_spec_data = np.zeros((num_ts, 24, len(spectral_axis))) this_y = (spec_list.time - spec_list.time[0]).to("s").value for i in range(24): f = np.poly1d(lin_cal_params[i, :]) this_x = f(spec_list.specs[0, i].spectral_axis.value) z = spec_list.specs[:, i].data f2d = RectBivariateSpline(this_x, this_y, z.T) new_spec_data[:, i, :] = f2d(spectral_axis.value, this_y).T specs = Spectrum1D(spectral_axis=spectral_axis, flux=new_spec_data * u.ct) new_spec_list = SpectrumList( spec_list.pkt_list, specs, pixel_ids=spec_list._pixel_ids ) return new_spec_list
[docs] def get_ql_calibration_file(this_time: Time) -> Path: """Given a time return a quicklook calibration file.""" file_directory = _data_directory / "science" / "calibration" / "quicklook" file_list = list(file_directory.glob("*.npy")) file_table = TimeSeries( time=Time([util.get_file_time(this_file.name) for this_file in file_list]), data={"filename": [this_file.name for this_file in file_list]}, ) ind = file_table.time <= this_time if np.any(ind): return file_directory / file_table[ind][-1]["filename"] else: raise FileNotFoundError(f"No calibration file valid for time {this_time}")