Source code for padre_meddea.io.aws_db

"""Provides functions to upload data to the SWxSOC time series database for dashboard display"""

import astropy.units as u
import numpy as np
from astropy.time import Time
from astropy.timeseries import TimeSeries
from specutils import SpectralRegion
from swxsoc.util.util import create_annotation, record_timeseries

from padre_meddea.housekeeping.calibration import get_calibration_func
from padre_meddea.spectrum.calibration import (
    calibrate_linear_speclist,
    get_ql_calibration_file,
)
from padre_meddea.spectrum.spectrum import SpectrumList
from padre_meddea.net import PADREClient
from padre_meddea.util.util import parse_science_filename, parse_raw_meddea_filename


[docs] def record_spectra(spec_list: SpectrumList): """Send spectrum time series data to AWS.""" lin_cal_params = np.load(get_ql_calibration_file(spec_list.time[0])) cal_spec_list = calibrate_linear_speclist( spec_list, lin_cal_params, spectral_axis=np.arange(5, 100, 2) * u.keV, # keep this to about 50 bins ) if cal_spec_list.calibrated: sr = SpectralRegion([[5, 10], [10, 15], [15, 25], [25, 50], [50, 100]] * u.keV) ts = cal_spec_list.lightcurve(pixel_list=spec_list.pixel_list, sr=sr) # record_timeseries(ts, "spectra", "meddea") ts_specgram = cal_spec_list.spectrogram() ts["specgram"] = ts_specgram["specgram"] record_timeseries(ts, "spectra", "meddea")
[docs] def record_photons(pkt_list, event_list): """Send photon time series data to AWS.""" record_timeseries(pkt_list, "photon_pkt", "meddea") create_annotation(pkt_list.time[0], f"{pkt_list.meta['ORIGFILE']}", ["meta"])
[docs] def record_housekeeping(hk_ts: TimeSeries): """Send the housekeeping time series to AWS.""" my_hk_ts = hk_ts.copy() colnames_to_remove = [ "CCSDS_APID", "CCSDS_VERSION_NUMBER", "CCSDS_PACKET_TYPE", "CCSDS_SECONDARY_FLAG", "CCSDS_SEQUENCE_FLAG", "CCSDS_SEQUENCE_COUNT", "CCSDS_PACKET_LENGTH", "timestamp", "CHECKSUM", ] for this_col in colnames_to_remove: if this_col in hk_ts.colnames: my_hk_ts.remove_column(this_col) # calibrate hard to calibrate columns before sending colnames_to_calibrate = ["fp_temp", "hvps_temp", "dib_temp"] for this_col in colnames_to_calibrate: if this_col in hk_ts.colnames: f = get_calibration_func(this_col) my_hk_ts[f"cal_{this_col}"] = f(hk_ts[this_col]).value record_timeseries(my_hk_ts, "housekeeping", "meddea") create_annotation(my_hk_ts.time[0], f"{hk_ts.meta['ORIGFILE']}", ["meta"])
[docs] def record_cmd(cmd_ts): """Send command time series to AWS.""" record_timeseries(cmd_ts, "cmd_resp", "meddea") create_annotation(cmd_ts.time[0], f"{cmd_ts.meta['ORIGFILE']}", ["meta"])
[docs] def record_filename(filename: str, start_time: Time, end_time: Time, level_str: str): """Record filename and time range of the file""" ts = TimeSeries(time=[start_time]) # convert end time to timestream compatible timestamp, same format as start time ts["end_time"] = [str(int(end_time.to_datetime().timestamp() * 1000))] ts["filename"] = [filename] url_base = f"{PADREClient.baseurl}/padre/padre-meddea/" if level_str == "raw": tokens = parse_raw_meddea_filename(filename) else: tokens = parse_science_filename(filename) data_type = tokens["descriptor"] time_path = PADREClient._generate_time_paths(start_time, end_time) url = f"{url_base}/{level_str}/{data_type}/{time_path[0]}" ts["url"] = [url] record_timeseries(ts, f"files_{level_str}", "meddea")