Source code for padre_meddea.spectrum.spectrum

"""
Provides data containers for both summary spectra and event list or photon data.
"""

import astropy.units as u
import numpy as np
from astropy.nddata import StdDevUncertainty
from astropy.table import Table, vstack
from astropy.time import Time
from astropy.timeseries import BinnedTimeSeries, TimeSeries, aggregate_downsample
from specutils import SpectralRegion, Spectrum1D

from padre_meddea.util.pixels import PixelList, get_pixelid

DEFAULT_SPEC_PIXEL_IDS = np.array(
    [
        51738,
        51720,
        51730,
        51712,
        51733,
        51715,
        51770,
        51752,
        51762,
        51744,
        51765,
        51747,
        51802,
        51784,
        51794,
        51776,
        51797,
        51779,
        51834,
        51816,
        51826,
        51808,
        51829,
        51811,
    ],
    dtype=np.uint16,
)
MAX_PH_DATA_RATE = 100 * u.kilobyte / u.s

DEFAULT_SPEC_PIXEL_LIST = PixelList(pixelids=DEFAULT_SPEC_PIXEL_IDS)

__all__ = [
    "PhotonList",
    "SpectrumList",
]


[docs] class PhotonList: """Data container for MeDDEA photon or event list data Parameters ---------- pkt_list : TimeSeries The time series of photon packet header data. event_list : TimeSeries The time series of event data Examples -------- >>> from padre_meddea.io import read_file >>> from padre_meddea.util.pixels import PixelList >>> ph_list = read_file("padre_meddea_l0test_photons_20250504T070411_v0.1.0.fits") # doctest: +SKIP >>> this_spectrum = ph_list.spectrum(pixel_list=ph_list.pixel_list) # doctest: +SKIP """ def __init__(self, pkt_list: TimeSeries, event_list: TimeSeries): self.data = {"event_list": event_list, "pkt_list": pkt_list} self.event_list = event_list self.pkt_list = pkt_list def __getitem__(self, key): if isinstance(key, int): return self.event_list[key] elif isinstance(key, slice): if isinstance(key.start, Time) and isinstance(key.stop, Time): pkt_ind = (self.pkt_list.time > key.start) * ( self.pkt_list.time < key.stop ) ph_ind = (self.event_list.time > key.start) * ( self.event_list.time < key.stop ) return type(self)(self.pkt_list[pkt_ind], self.event_list[ph_ind]) return self def __str__(self): return f"{self._text_summary()}{self.data.__repr__()}" def __repr__(self): return f"{object.__repr__(self)}\n{self}"
[docs] def _text_summary(self): num_events = len(self.data["event_list"]) result = f"PhotonList ({num_events:,} events)\n" if num_events > 0: dt = self.data["event_list"].time[-1] - self.data["event_list"].time[0] dt.format = "quantity_str" if dt < (1 * u.day): result += f"{self.data['event_list'].time[0]} - {str(self.data['event_list'].time[-1])[11:]} ({dt})\n" else: result += f"{self.data['event_list'].time[0]} - {self.data['event_list'].time[-1]} ({dt})\n" return result
def __add__(self, other: PixelList): event_list = vstack([self.event_list, other.event_list]) pkt_list = vstack([self.pkt_list, other.pkt_list]) return type(self)(pkt_list, event_list) @property def calibrated(self): if "energy" in self.event_list.colnames: return True else: return False @property def pixel_list(self) -> PixelList: """Return the set of pixels that have events""" # note this is calculated on the fly instead of at init because it can take a few seconds to compute for large event lists pixel_ids = np.unique( get_pixelid(self.event_list["asic"], self.event_list["pixel"]) ) return PixelList(pixelids=pixel_ids)
[docs] def spectrum( self, pixel_list: PixelList, bins=None, baseline_sub: bool = False, calibrate: bool = False, ) -> Spectrum1D: """ Create a spectrum Parameters ---------- pixel_list : PixelList A list of one or more pixels bins : np.array The bin edges for the spectrum (see ~np.histogram). If None, then uses np.arange(0, 2**12 - 1) baseline_sub : bool If True, then baseline measurements are subtracted if they exist Note: not yet implemented. calibrate : bool If True, provide the calibrated spectrum Returns ------- spectrum : Spectrum1D """ if not calibrate and bins is None: bins = np.arange(0, 2**12 - 1) * u.pix if calibrate and bins is None: bins = np.arange(0, 100, 0.1) * u.keV this_event_list = self._slice_event_list_pixels(pixel_list) if calibrate: hit_energy = this_event_list["energy"] else: hit_energy = this_event_list["atod"] data, new_bins = np.histogram(hit_energy, bins=bins.value) # for Spectrum1D, the spectral axis is at the center of the bins # TODO: the histogram results are not consistent with the above result = Spectrum1D( flux=u.Quantity(data, "count"), spectral_axis=bins, uncertainty=StdDevUncertainty(np.sqrt(data) * u.count), ) return result
[docs] def lightcurve( self, pixel_list: PixelList, int_time: u.Quantity[u.s], sr: SpectralRegion, step: int = 10, ) -> TimeSeries: """ Create a light curve Parameters ---------- pixel_list : PixelList The pixels to integrate over int_time : u.Quantity[u.s] The integration time for each time step sr : SpectralRegion The spectral region(s) to integrate over step : int To speed up processing, skip every `step` photons. Default is ten. The light curve count rate is corrected by multiplying by `step`. Returns ------- lc : TimeSeries """ this_event_list = self._slice_event_list_pixels(pixel_list) # downsample the event list this_event_list = TimeSeries(time=self.event_list.time[::step]) for this_sr in sr: this_event_list = self._slice_event_list_sr(sr) col_label = f"{this_sr.lower}-{this_sr.upper}_cts" this_event_list[col_label] = np.ones(len(this_event_list)) ts = aggregate_downsample( this_event_list, time_bin_size=int_time, aggregate_func=np.sum ) ts[col_label] *= step return ts
[docs] def data_rate(self) -> BinnedTimeSeries: """Return a BinnedTimeseries of the data rate. Returns ------- data_rate : BinnedTimeSeries """ # correct the ccsds packet length by adding ccsds header and adding missing 1 pkt_length = (self.pkt_list["pktlength"] + 3 * 2 + 1) * u.byte good_times = ( self.pkt_list.time > self.pkt_list.time[0] ) # to protect against bad times data_rate = TimeSeries( time=self.pkt_list.time[good_times], data={"packet_size": pkt_length[good_times]}, ) data_rate_ts = aggregate_downsample( data_rate, time_bin_size=1 * u.s, aggregate_func=np.sum ) data_rate_ts.rename_column("packet_size", "data_rate") data_rate_ts["data_rate"] = data_rate_ts["data_rate"] / u.s return data_rate_ts
[docs] def _slice_event_list_pixels(self, pixel_list: PixelList) -> TimeSeries: """Slice the event list to only contain events from asic_num and pixel_num""" ind = np.zeros(len(self.event_list)) if isinstance(pixel_list, Table.Row): ind = np.logical_or( ind, (self.event_list["pixel"] == int(pixel_list["pixel"])) * (self.event_list["asic"] == int(pixel_list["asic"])), ) else: for this_pixel in pixel_list: ind = np.logical_or( ind, (self.event_list["pixel"] == int(this_pixel["pixel"])) * (self.event_list["asic"] == int(this_pixel["asic"])), ) return self.event_list[ind]
[docs] def _slice_event_list_sr(self, sr: SpectralRegion): """Slice the envt list to only contain events inside the spectral region.""" if len(sr) > 1: raise ValueError("Only supports Spectral Regions of length 1.") if sr[0].lower.unit == u.Unit("keV"): data = self.event_list["energy"] * u.pix elif sr[0].lower.unit == u.Unit("pix"): data = self.event_list["atod"] * u.pix else: raise ValueError( f"Unit of Spectral Region, {sr[0].lower.unit}, not recognized." ) ind = (data > sr[0].lower) * (data < sr[0].upper) return self.event_list[ind]
[docs] class SpectrumList: """ A data container for MeDDEA summary spectrum data Parameters ---------- pkt_spec : TimeSeries The time series of spectrum packet header data. specs : Spectrum1D The spectrum cube pixel_ids : np.array The pixel id array Raises ------ ValueError If pixel arrays are found to change. Examples -------- >>> from padre_meddea.io import read_file >>> from astropy.time import Time >>> spec_list = read_file("padre_meddea_l0test_spectrum_20250504T070411_v0.1.0.fits") # doctest: +SKIP >>> this_spectrum = spec_list.spectrum(pixel_list=spec_list.pixel_list) # doctest: +SKIP """ def __init__(self, pkt_list: TimeSeries, specs, pixel_ids): self.bins = np.arange(0, 4097, 8, dtype=np.uint16) self.time = pkt_list.time self.data = {"pkt_list": pkt_list, "specs": specs, "pixel_ids": pixel_ids} self.pkt_list = self.data["pkt_list"] self.specs = self.data["specs"] self._pixel_ids = self.data["pixel_ids"] if len(np.unique(pixel_ids)) > 24: print("Found too many unique pixel IDs.") print("Forcing to default set") self.pixel_list = PixelList(pixelids=DEFAULT_SPEC_PIXEL_IDS) else: if np.all(np.unique(pixel_ids) == sorted(pixel_ids[0, :])): self.pixel_list = PixelList( pixelids=np.median(pixel_ids, axis=0).astype("uint16") ) else: raise ValueError("Found change in pixel ids") self.index = len(pkt_list) @property def calibrated(self): if self.specs[0, 0].spectral_axis.unit == u.Unit("keV"): return True else: return False def __str__(self): return f"{self._text_summary()}{self.data['specs'].__repr__()}" def __repr__(self): return f"{object.__repr__(self)}\n{self}"
[docs] def _text_summary(self): dt = self.time[-1] - self.time[0] dt.format = "quantity_str" result = f"SpectrumList ({self.specs.shape[0]:,} spectra, {int(np.sum(self.specs.data)):,} events)\n" if dt < (1 * u.day): result += f"{self.time[0]} - {str(self.time[-1])[11:]} ({dt})\n" else: result += f"{self.time[0]} - {self.time[-1]} ({dt})\n" return result
[docs] def spectrum(self, pixel_list: PixelList): """Create a spectrum, integrates over all times Parameters ---------- asic_num : int The asic or detector number (0 to 3) pixel_num : int The pixel number (0 to 11) or spec_index : int The spectrum index from 0 to 23 Raises ------ ValueError If the selected asic_num and pixel_num are not found in the spectra Returns ------- spectrum : Spectrum1D """ flux = np.zeros([self.specs.data.shape[2]]) if isinstance(pixel_list, Table.Row): if pixel_list in self.pixel_list: pixel_index = np.where(pixel_list == self.pixel_list)[0][0] flux += np.sum(self.specs.data[:, pixel_index, :], axis=0) else: for this_pixel in pixel_list: if this_pixel in self.pixel_list: pixel_index = np.where(this_pixel == self.pixel_list)[0][0] flux += np.sum(self.specs.data[:, pixel_index, :], axis=0) # the spectral axis is at the center of the bins result = Spectrum1D( flux=flux * self.specs[0, 0].flux.unit, spectral_axis=self.specs[0, 0].spectral_axis, uncertainty=StdDevUncertainty(np.sqrt(flux) * u.count), ) return result
[docs] def lightcurve(self, pixel_list: PixelList, sr: SpectralRegion) -> TimeSeries: """ Create a light curve Parameters ---------- pixel_index : int The pixels to integrate over sr : SpectralRegion The spectral region(s) to integrate over Returns ------- lc : TimeSeries """ lc = TimeSeries(time=self.time) flux = np.zeros([self.specs.data.shape[0], self.specs.data.shape[2]]) if isinstance(pixel_list, Table.Row): if pixel_list in self.pixel_list: pixel_index = np.where(pixel_list == self.pixel_list)[0][0] flux += self.specs.data[:, pixel_index, :] else: for this_pixel in pixel_list: if this_pixel in self.pixel_list: pixel_index = np.where(this_pixel == self.pixel_list)[0][0] flux += self.specs.data[:, pixel_index, :] for i, this_sr in enumerate(sr): if str(self.specs.spectral_axis.unit) != str(this_sr.lower.unit): raise ValueError( f"Units of spectral axis ({self.specs.spectral_axis.unit}) does not match units of sr ({this_sr.lower.unit})" ) this_flux = flux.copy() ind = (self.specs.spectral_axis > this_sr.lower) * ( self.specs.spectral_axis < this_sr.upper ) this_flux[:, ~ind] = 0 col_label = f"{this_sr.lower.value:0.0f}to{this_sr.upper.value:0.0f}_{this_sr.upper.unit}" total_cts = np.sum(this_flux, axis=1) lc[col_label] = total_cts return lc
[docs] def spectrogram(self): specgram = np.sum(self.specs.data, axis=1) * u.ct ts = TimeSeries(time=self.time, data={"specgram": specgram}) ts.meta["spectral_axis"] = u.Quantity(self.specs.spectral_axis) return ts
[docs] def plot_spectrogram(self, **imshow_kwargs): """Plot a spectrogram""" import matplotlib.dates as mdates import matplotlib.pyplot as plt ts = [mdates.date2num(this_time) for this_time in self.time.to_datetime()] x_lims = [ts[0], ts[-1]] y_lims = [ self.specs[0, 0].spectral_axis[0].value, self.specs[0, 0].spectral_axis[-1].value, ] fig, ax = plt.subplots() # TODO use spectrogram function above specgram = np.sum(self.specs.data, axis=1) ax.imshow( specgram.transpose(), origin="lower", interpolation="nearest", extent=[x_lims[0], x_lims[1], y_lims[0], y_lims[1]], **imshow_kwargs, ) date_format = mdates.DateFormatter("%H:%M:%S") ax.xaxis.set_major_formatter(date_format) # This simply sets the x-axis data to diagonal so it fits better. fig.autofmt_xdate() plt.show()
def __getitem__(self, key): if isinstance(key, int): return self.specs[key] elif isinstance(key, slice): if isinstance(key.start, Time) and isinstance(key.stop, Time): ind = (self.time > key.start) * (self.time < key.stop) return type(self)( self.pkt_list[ind], self.specs[ind, :, :], self._pixel_ids ) return self