Source code for padre_meddea.calibration.calibration

"""
A module for all things calibration.
"""

from pathlib import Path

from astropy.io import fits
from astropy.table import Table

import padre_meddea
import padre_meddea.io.aws_db as aws_db
import padre_meddea.util.pixels as pixels
from padre_meddea import log
from padre_meddea.io import file_tools
from padre_meddea.io.fits_tools import get_comment, get_obs_header, get_primary_header
from padre_meddea.util import validation
from padre_meddea.util.util import (
    calc_time,
    create_meddea_filename,
)

__all__ = [
    "process_file",
]


[docs] def process_file(filename: Path, overwrite=False) -> list: """ This is the entry point for the pipeline processing. It runs all of the various processing steps required. Parameters ---------- data_filename: str Fully specificied filename of an input file Returns ------- output_filenames: list Fully specificied filenames for the output files. """ log.info(f"Processing file {filename}.") output_files = [] file_path = Path(filename) if file_path.suffix.lower() in [".bin", ".dat"]: # raw file # Before we process, validate the file with CCSDS custom_validators = [ validation.validate_packet_checksums, lambda f: validation.validate_file_size(f, size_limit=10500000), # 10 MB ] validation_findings = validation.validate( file_path, valid_apids=list(padre_meddea.APID.values()), custom_validators=custom_validators, ) for finding in validation_findings: log.warning(f"Validation Finding for file : {filename} : {finding}") parsed_data = file_tools.read_raw_file(file_path) # Prepare Metadata for Output Files Naming and Headers test_flag = False input_level_str = "raw" level_str = "l0" # Look for each kind of data in the parsed packets if parsed_data["photons"] is not None: # we have event list data # Set Data Type for L0 Data data_type = "photon" pkt_list, event_list = parsed_data["photons"] log.info( f"Found photon data, {len(event_list)} photons and {len(pkt_list)} packets." ) event_list = Table(event_list) event_list.remove_column("time") # Get FITS Primary Header Template primary_hdr = get_primary_header( file_path, data_level=level_str, data_type=data_type ) for this_keyword in ["DATE-BEG", "DATE-END", "DATE-AVG"]: primary_hdr[this_keyword] = ( event_list.meta.get(this_keyword, ""), get_comment(this_keyword), ) primary_hdr["DATEREF"] = (primary_hdr["DATE-BEG"], get_comment("DATEREF")) path = create_meddea_filename( time=primary_hdr["DATE-BEG"], level=level_str, descriptor=data_type, test=test_flag, overwrite=overwrite, ) primary_hdr["FILENAME"] = (path.name, get_comment("FILENAME")) empty_primary_hdu = fits.PrimaryHDU(header=primary_hdr) # PKT HDU # record originating filename aws_db.record_filename( file_path.name, pkt_list.time[0], pkt_list.time[-1], level_str=input_level_str, ) # record output filename aws_db.record_filename( path.name, pkt_list.time[0], pkt_list.time[-1], level_str=level_str ) pkt_list = Table(pkt_list) pkt_list.remove_column("time") # PKT Header pkt_header = get_obs_header(data_level=level_str, data_type=data_type) pkt_header["DATE-BEG"] = ( event_list.meta.get("DATE-BEG", ""), get_comment("DATE-BEG"), ) pkt_header["DATEREF"] = ( event_list.meta.get("DATE-BEG", ""), get_comment("DATEREF"), ) pkt_header["FILENAME"] = (path.name, get_comment("FILENAME")) pkt_hdu = fits.BinTableHDU(pkt_list, header=pkt_header, name="PKT") pkt_hdu.add_checksum() # SCI HDU hit_header = get_obs_header(data_level=level_str, data_type=data_type) hit_header["DATE-BEG"] = ( event_list.meta.get("DATE-BEG", ""), get_comment("DATE-BEG"), ) hit_header["DATEREF"] = ( event_list.meta.get("DATE-BEG", ""), get_comment("DATEREF"), ) hit_header["FILENAME"] = (path.name, get_comment("FILENAME")) hit_hdu = fits.BinTableHDU(event_list, header=hit_header, name="SCI") hit_hdu.add_checksum() hdul = fits.HDUList([empty_primary_hdu, hit_hdu, pkt_hdu]) # Write the file, with the overwrite option controlled by the environment variable hdul.writeto(path, overwrite=overwrite, checksum=True) hdul.close() # Store the output file path in a list output_files.append(path) if parsed_data["housekeeping"] is not None: # Set Data Type for L0 Data data_type = "housekeeping" hk_data = parsed_data["housekeeping"] # send data to AWS Timestream for Grafana dashboard aws_db.record_housekeeping(hk_data) hk_table = Table(hk_data) # Get FITS Primary Header Template primary_hdr = get_primary_header( file_path, data_level=level_str, data_type=data_type ) date_beg = calc_time(hk_data["pkttimes"][0]) date_end = calc_time(hk_data["pkttimes"][-1]) primary_hdr["DATE-BEG"] = (date_beg.fits, get_comment("DATE-BEG")) primary_hdr["DATEREF"] = (date_beg.fits, get_comment("DATEREF")) hk_table["seqcount"] = hk_table["CCSDS_SEQUENCE_COUNT"] colnames_to_remove = [ "CCSDS_VERSION_NUMBER", "CCSDS_PACKET_TYPE", "CCSDS_SECONDARY_FLAG", "CCSDS_SEQUENCE_FLAG", "CCSDS_APID", "CCSDS_SEQUENCE_COUNT", "CCSDS_PACKET_LENGTH", "CHECKSUM", "time", ] for this_col in colnames_to_remove: if this_col in hk_table.colnames: hk_table.remove_column(this_col) path = create_meddea_filename( time=date_beg, level=level_str, descriptor=data_type, test=test_flag, overwrite=overwrite, ) primary_hdr["FILENAME"] = (path.name, get_comment("FILENAME")) # record originating filename aws_db.record_filename( file_path.name, date_beg, date_end, level_str=input_level_str ) # record output filename aws_db.record_filename(path.name, date_beg, date_end, level_str=level_str) empty_primary_hdu = fits.PrimaryHDU(header=primary_hdr) # Create HK HDU hk_header = get_obs_header(data_level=level_str, data_type=data_type) hk_header["DATE-BEG"] = (date_beg.fits, get_comment("DATE-BEG")) hk_header["DATEREF"] = (date_beg.fits, get_comment("DATEREF")) hk_header["FILENAME"] = (path.name, get_comment("FILENAME")) hk_hdu = fits.BinTableHDU(data=hk_table, header=hk_header, name="HK") hk_hdu.add_checksum() # add command response data if it exists in the same fits file cmd_header = get_obs_header(data_level=level_str, data_type=data_type) cmd_header["FILENAME"] = (path.name, get_comment("FILENAME")) if parsed_data["cmd_resp"] is not None: data_ts = parsed_data["cmd_resp"] cmd_header["DATE-BEG"] = ( data_ts.time[0].fits, get_comment("DATE-BEG"), ) cmd_header["DATEREF"] = ( data_ts.time[0].fits, get_comment("DATEREF"), ) aws_db.record_cmd(data_ts) data_table = Table(data_ts) colnames_to_remove = [ "CCSDS_VERSION_NUMBER", "CCSDS_PACKET_TYPE", "CCSDS_SECONDARY_FLAG", "CCSDS_SEQUENCE_FLAG", "CCSDS_APID", "CCSDS_SEQUENCE_COUNT", "CCSDS_PACKET_LENGTH", "CHECKSUM", "time", ] for this_col in colnames_to_remove: if this_col in hk_table.colnames: data_table.remove_column(this_col) cmd_hdu = fits.BinTableHDU( data=data_table, header=cmd_header, name="READ" ) cmd_hdu.add_checksum() else: # if None still end an empty Binary Table cmd_hdu = fits.BinTableHDU(data=None, header=cmd_header, name="READ") hdul = fits.HDUList([empty_primary_hdu, hk_hdu, cmd_hdu]) hdul.writeto(path, overwrite=overwrite, checksum=True) hdul.close() output_files.append(path) if parsed_data["spectra"] is not None: from padre_meddea.spectrum.spectrum import SpectrumList # Set Data Type for L0 Data data_type = "spectrum" # TODO check that asic_nums and channel_nums do not change # the function below will remove any change in pixel ids pkt_ts, specs, pixel_ids = parsed_data["spectra"] ts, spectra, ids = file_tools.clean_spectra_data(pkt_ts, specs, pixel_ids) asic_nums, channel_nums = pixels.parse_pixelids(ids) # Get FITS Primary Header Template primary_hdr = get_primary_header( file_path, data_level=level_str, data_type=data_type ) dates = { "DATE-BEG": ts.time[0].fits, "DATE-END": ts.time[-1].fits, "DATE-AVG": ts.time[len(ts.time) // 2].fits, } primary_hdr["DATEREF"] = (dates["DATE-BEG"], get_comment("DATEREF")) for this_keyword, value in dates.items(): primary_hdr[this_keyword] = ( value, get_comment(this_keyword), ) path = create_meddea_filename( time=dates["DATE-BEG"], level=level_str, descriptor=data_type, test=test_flag, overwrite=overwrite, ) # record originating filename aws_db.record_filename( file_path.name, ts.time[0], ts.time[-1], level_str=input_level_str ) # record output filename aws_db.record_filename( path.name, ts.time[0], ts.time[-1], level_str=level_str ) primary_hdr["FILENAME"] = (path.name, get_comment("FILENAME")) # Spectrum HDU spec_header = get_obs_header(data_level=level_str, data_type=data_type) spec_header["DATE-BEG"] = (primary_hdr["DATE-BEG"], get_comment("DATE-BEG")) spec_header["DATEREF"] = (primary_hdr["DATE-BEG"], get_comment("DATEREF")) spec_header["FILENAME"] = (path.name, get_comment("FILENAME")) spec_hdu = fits.CompImageHDU( data=spectra.data, header=spec_header, name="SPEC", compression_type="GZIP_1", ) # NOTE: CompImageHDU does not support add_checksum, so we add checksum to the HDUList later data_table = Table() data_table["pkttimes"] = ts["pkttimes"] data_table["pktclock"] = ts["pktclock"] data_table["asic"] = asic_nums data_table["channel"] = channel_nums data_table["seqcount"] = ts["seqcount"] pkt_header = get_obs_header(data_level=level_str, data_type=data_type) pkt_header["DATE-BEG"] = (primary_hdr["DATE-BEG"], get_comment("DATE-BEG")) pkt_header["DATEREF"] = (primary_hdr["DATE-BEG"], get_comment("DATEREF")) pkt_header["FILENAME"] = (path.name, get_comment("FILENAME")) pkt_hdu = fits.BinTableHDU(data=data_table, header=pkt_header, name="PKT") pkt_hdu.add_checksum() empty_primary_hdu = fits.PrimaryHDU(header=primary_hdr) hdul = fits.HDUList([empty_primary_hdu, spec_hdu, pkt_hdu]) hdul.writeto(path, overwrite=overwrite, checksum=True) hdul.close() # calibrate to ql data and send to AWS # spec_list = file_tools.read_file(path) spec_list = SpectrumList(ts, spectra, ids) aws_db.record_spectra(spec_list) output_files.append(path) # add other tasks below return output_files