Source code for padre_meddea.io.file_tools

"""
Provides generic file readers.
"""

from pathlib import Path

import astropy.io.fits as fits
import astropy.units as u
import numpy as np
from astropy.table import Table
from astropy.timeseries import TimeSeries
from ccsdspy.utils import count_packets, split_by_apid
from specutils import Spectrum1D

import padre_meddea.util.util as util
from padre_meddea import APID, log
from padre_meddea.housekeeping.housekeeping import (
    clean_hk_data,
    parse_cmd_response_packets,
    parse_housekeeping_packets,
)
from padre_meddea.spectrum.raw import (
    clean_spectra_data,
    parse_ph_packets,
    parse_spectrum_packets,
)
from padre_meddea.spectrum.spectrum import PhotonList, SpectrumList

__all__ = ["read_file", "read_raw_file", "read_fits"]


[docs] def read_file(filename: Path): """ Read a file. Parameters ---------- filename: Path A file to read. Returns ------- data Examples -------- """ # TODO: the following should use parse_science_filename this_path = Path(filename) match this_path.suffix.lower(): case ".bin": # raw binary file result = read_raw_file(this_path) case ".fits": # level 0 or above result = read_fits(this_path) case ".dat": match this_path.name.lower()[0:9].lower(): case "padremda2": # summary spectrum data result = read_raw_a2(this_path) case "padremda0": # photon data result = read_raw_a0(this_path) case "padremdu8": # housekeeping and command response data result = read_raw_u8(this_path) case _: raise ValueError(f"File extension {this_path.suffix} not recognized.") return result
def read_raw_a2(filename: Path) -> SpectrumList: """ Read a raw Spectrum (A2) packet file. Parameters ---------- filename : Path A file to read Returns ------- spectra : SpectrumList """ this_path = Path(filename) raw_data = read_raw_file(this_path) pkt_spec, specs, pixel_ids = raw_data["spectra"] result = SpectrumList(pkt_spec, specs, pixel_ids) return result def read_raw_a0(filename: Path) -> PhotonList: """ Read a raw photon (A0) packet file. Parameters ---------- filename : Path A file to read Returns ------- eventlist : PhotonList """ this_path = Path(filename) raw_data = read_raw_file(this_path) pkt_list, event_list = raw_data["photons"] result = PhotonList(pkt_list, event_list) return result def read_raw_u8(filename: Path): """ Read a raw housekeeping (U8) packet file. Parameters ---------- filename : Path A file to read Returns ------- housekeeping timeseries, command timeseries """ this_path = Path(filename) raw_data = read_raw_file(this_path) hk_ts = raw_data["housekeeping"] cmd_ts = raw_data["cmd_resp"] return hk_ts, cmd_ts
[docs] def read_raw_file(filename: Path): """ Read a raw data file. Parameters --------- filename : Path A file to read Returns ------- data : dict A dictionary of data arrays. """ result = { "photons": parse_ph_packets(filename), "housekeeping": parse_housekeeping_packets(filename), "spectra": parse_spectrum_packets(filename), "cmd_resp": parse_cmd_response_packets(filename), } return result
[docs] def read_fits(filename: Path): """ Read a fits file of any level and return the appropriate data objects. """ hdul = fits.open(filename) header = hdul[0].header.copy() hdul.close() level = header["LEVEL"] data_type = header["BTYPE"] if level in ["l0", "l1"]: match data_type: case "photon": return read_fits_l0l1_photon(filename) case "housekeeping": return read_fits_l0l1_housekeeping(filename) case "spectrum": return read_fits_l0l1_spectrum(filename) case _: raise ValueError(f"Data type {data_type} is not recognized.") else: raise ValueError( f"File level, {level}, and data type, {data_type}, of {filename} not recogized." )
def read_fits_l0l1_photon(filename: Path) -> PhotonList: """ Read a level 0 photon fits file. """ event_list_table = Table.read(filename, hdu=1) ph_times = util.calc_time( event_list_table["pkttimes"], event_list_table["pktclock"], event_list_table["clocks"], ) event_list_table["time"] = ph_times event_list = TimeSeries(event_list_table) packet_list_table = Table.read(filename, hdu=2) pkt_times = util.calc_time( packet_list_table["pkttimes"], packet_list_table["pktclock"] ) packet_list_table["time"] = pkt_times packet_list = TimeSeries(packet_list_table) packet_list = util.trim_timeseries(packet_list) event_list = util.trim_timeseries(event_list) return PhotonList(packet_list, event_list) def read_fits_l0l1_housekeeping(filename: Path) -> tuple[TimeSeries, TimeSeries]: """Read a level 0 housekeeping file Returns ------- hk_ts, cmd_ts TimeSeries of housekeeping data, TimeSeries of reads """ hk_table = Table.read(filename, hdu=1) if "pkttimes" in hk_table.columns: hk_times = util.calc_time(hk_table["pkttimes"]) elif "timestamp" in hk_table.columns: hk_times = util.calc_time(hk_table["timestamp"]) hk_table["time"] = hk_times hk_ts = TimeSeries(hk_table) hk_ts = clean_hk_data(hk_ts) cmd_table = Table.read(filename, hdu=2) if len(cmd_table) == 0: log.warning(f"No command response data found in {filename}") cmd_ts = TimeSeries() else: cmd_times = util.calc_time(cmd_table["pkttimes"], cmd_table["pktclock"]) cmd_table["time"] = cmd_times cmd_ts = TimeSeries(cmd_table) cmd_ts = util.trim_timeseries(cmd_ts) return hk_ts, cmd_ts def read_fits_l0l1_spectrum(filename: Path): """Read a level 0 spectrum file. .. note:: This function is in Draft form and what it returns will likely be updated. Returns ------- timestamps, Spectrum1D array, asic_nums, pixel_nums, pixelid_strings """ pkt_table = Table.read(filename, hdu=2) pkt_times = util.calc_time(pkt_table["pkttimes"], pkt_table["pktclock"]) pkt_table["time"] = pkt_times pkt_ts = TimeSeries(pkt_table) hdu = fits.open(filename) specs = Spectrum1D( spectral_axis=np.arange(512) * u.pix, flux=hdu["spec"].data * u.ct ) # reconstruct pixel ids TODO use util.get_pixelid pixel_ids = (hdu["PKT"].data["asic"] << 5) + (hdu["PKT"].data["channel"]) + 0xCA00 pkt_ts, specs, pixel_ids = clean_spectra_data(pkt_ts, specs, pixel_ids) return SpectrumList(pkt_ts, specs, pixel_ids) def inspect_raw_file(filename: Path): """Given a raw binary file of packets, provide some high level summary information.""" with open(filename, "rb") as mixed_file: stream_by_apid = split_by_apid(mixed_file) num_packets = count_packets(filename) print(f"There are {num_packets} total packets in this file") # TODO move this message to the logger, also assumes that there are no unknown APIDs print(f"APIDs found {stream_by_apid.keys()}.") for key, val in APID.items(): if val in stream_by_apid.keys(): print( f"There are {count_packets(stream_by_apid[val])} {key} packets (APID {val})." )