Source code for padre_meddea.housekeeping.housekeeping
"""Provides functions for housekeeping data"""
import datetime
from pathlib import Path
import ccsdspy
import numpy as np
from astropy.io import ascii
from astropy.timeseries import TimeSeries
from ccsdspy import PacketField
from ccsdspy.utils import split_by_apid
import padre_meddea
from padre_meddea import APID, EPOCH, log
from padre_meddea.housekeeping.register import add_register_address_name
from padre_meddea.util.util import MIN_TIME_BAD, calc_time
_data_directory = padre_meddea._data_directory / "housekeeping"
hk_definitions = ascii.read(_data_directory / "hk_packet_def.csv")
hk_definitions.add_index("name")
[docs]
def parse_housekeeping_packets(filename: Path):
"""Given a raw file, read only the housekeeping packets and return a timeseries.
Parameters
----------
filename : Path
A file to read
Returns
-------
hk_list : astropy.time.TimeSeries or None
A list of housekeeping data
"""
filename = Path(filename)
with open(filename, "rb") as mixed_file:
stream_by_apid = split_by_apid(mixed_file)
packet_bytes = stream_by_apid.get(APID["housekeeping"], None)
if packet_bytes is None:
return None
else:
log.info(f"{filename.name}: Found housekeeping data")
packet_definition = packet_definition_hk()
pkt = ccsdspy.FixedLength(packet_definition)
hk_data = pkt.load(packet_bytes, include_primary_header=True)
hk_pkttimess = [
datetime.timedelta(seconds=int(this_t)) + EPOCH
for this_t in hk_data["pkttimes"]
]
hk_data = TimeSeries(time=hk_pkttimess, data=hk_data)
hk_data.meta.update({"ORIGFILE": f"{filename.name}"})
# Clean Housekeeping Times
# hk_data = clean_housekeeping_data(hk_data)
return hk_data
[docs]
def packet_definition_hk():
"""Return the packet definiton for the housekeeping packets."""
p = [PacketField(name="pkttimes", data_type="uint", bit_length=32)]
for this_hk in hk_definitions["name"]:
p += [PacketField(name=this_hk, data_type="uint", bit_length=16)]
p += [PacketField(name="checksum", data_type="uint", bit_length=16)]
return p
[docs]
def parse_cmd_response_packets(filename: Path):
"""Given a raw binary file, read only the command response packets and return the data.
The packet is defined as follows
== =============================================================
# Description
== =============================================================
0 CCSDS header 1 (0x00A5)
1 CCSDS header 2 (0b11 and sequence count)
2 CCSDS header 3 payload size (remaining packet size - 1 octet)
3 time_stamp_s 1
4 time_stamp_s 2
5 time_stamp_clocks 1
6 time_stamp_clocks 2
7 register address
8 register value
9 checksum
== =============================================================
Parameters
----------
filename : Path
A file to read
Returns
-------
cmd_resp_list : astropy.time.TimeSeries or None
A list of register read responses.
"""
filename = Path(filename)
with open(filename, "rb") as mixed_file:
stream_by_apid = split_by_apid(mixed_file)
packet_bytes = stream_by_apid.get(APID["cmd_resp"], None)
if packet_bytes is None:
return None
else:
log.info(f"{filename.name}: Found read data")
packet_definition = packet_definition_cmd_response()
pkt = ccsdspy.FixedLength(packet_definition)
data = pkt.load(packet_bytes, include_primary_header=True)
timestamps = calc_time(data["pkttimes"], data["pktclock"])
data = {
"pkttimes": data["pkttimes"],
"pktclock": data["pktclock"],
"address": data["address"],
"value": data["value"],
"seqcount": data["CCSDS_SEQUENCE_COUNT"],
}
ts = TimeSeries(time=timestamps, data=data)
ts = add_register_address_name(ts)
ts.meta.update({"ORIGFILE": f"{filename.name}"})
# Clean Command Response Times
# ts = clean_cmd_response_data(ts)
return ts
[docs]
def packet_definition_cmd_response():
"""Return the packet definiton for the register read response"""
p = [
PacketField(name="pkttimes", data_type="uint", bit_length=32),
PacketField(name="pktclock", data_type="uint", bit_length=32),
PacketField(name="address", data_type="uint", bit_length=16),
PacketField(name="value", data_type="uint", bit_length=16),
PacketField(name="checksum", data_type="uint", bit_length=16),
]
return p
[docs]
def clean_hk_data(hk_ts: TimeSeries) -> TimeSeries:
"""
Clean the housekeeping data by replacing any bad times with interpolated times.
Parameters
----------
hk_ts : TimeSeries
The housekeeping data to clean.
Returns
-------
TimeSeries
The cleaned housekeeping data.
"""
# Calculates Differences in Time-Related Columns
dts = hk_ts.time[1:] - hk_ts.time[:-1]
if "pkttimes" in hk_ts.columns:
time_col_name = "pkttimes"
else:
time_col_name = "timestamp"
pkttimes_diff = hk_ts[time_col_name][1:] - hk_ts[time_col_name][:-1]
# Calculate the Cadence for Interpolation
median_dt = np.median(dts)
median_pkttimes_diff = np.median(pkttimes_diff)
bad_indices = np.argwhere(hk_ts.time < MIN_TIME_BAD)
for this_bad_index in bad_indices:
if this_bad_index < len(hk_ts.time) - 1:
hk_ts.time[this_bad_index] = hk_ts.time[this_bad_index + 1] - median_dt
hk_ts[time_col_name][this_bad_index] = (
hk_ts[time_col_name][this_bad_index + 1] - median_pkttimes_diff
)
else:
hk_ts.time[this_bad_index] = hk_ts.time[this_bad_index - 1] + median_dt
hk_ts[time_col_name][this_bad_index] = (
hk_ts[time_col_name][this_bad_index - 1] + median_pkttimes_diff
)
return hk_ts