Source code for padre_meddea.util.util

"""
Provides general utility functions.
"""

import csv
import os
import re
import tempfile
from pathlib import Path

import astropy.units as u
import numpy as np
from astropy.time import Time, TimeDelta
from astropy.timeseries import TimeSeries
from ccsdspy.utils import split_by_apid, split_packet_bytes
from sunpy.net.attr import AttrAnd
from swxsoc.util import (
    Descriptor,
    DevelopmentBucket,
    Instrument,
    Level,
    SearchTime,
    SWXSOCClient,
    create_science_filename,
    parse_science_filename,
)

import padre_meddea
from padre_meddea import APID, EPOCH, log

# used to identify bad times
MIN_TIME_BAD = Time("2024-02-01T00:00")

__all__ = [
    "parse_science_filename",
    "parse_raw_meddea_filename",
    "create_science_filename",
    "create_meddea_filename",
    "get_filename_version_base",
    "increment_filename_version",
    "calc_time",
    "has_baseline",
    "is_consecutive",
]


[docs] def parse_raw_meddea_filename(filename: str): """Parse a raw meddea filename Parameters ---------- filename : str Returns ------- tokens : dict """ tokens = parse_science_filename(filename) if filename[0:9].lower() == "padremdu8": tokens["descriptor"] = "housekeeping" elif filename[0:9].lower() == "padremda0": tokens["descriptor"] = "photon" elif filename[0:9].lower() == "padremda2": tokens["descriptor"] = "spectrum" tokens["descriptor"] return tokens
[docs] def create_meddea_filename( time: Time, level: str, descriptor: str, test: str, overwrite: bool = False, ) -> str: """ Generate the MEDDEA filename based on the provided parameters. Parameters ---------- time : Time The time associated with the data. level : str The data level (e.g., "L1", "L2"). descriptor : str The data descriptor (e.g., "SCI", "CAL"). test : str The test identifier (e.g., "TEST1", "TEST2"). overwrite : bool Whether to overwrite existing files. Returns ------- str The generated MEDDEA filename. """ # Filename Version X.Y.Z comes from two parts: # 1. Files Version Base: X.Y comes from the Software Version -> Data Version Mapping # 2. File Version Incrementor: Z starts at 0 and iterates for each new version based on what already exists in the filesystem. version_base = get_filename_version_base() version_increment = 0 version_str = f"{version_base}.{version_increment}" # The Base Filename is used for searching to see if we need to increase our version increment. base_filename = create_science_filename( instrument="meddea", time=time, level=level, descriptor=descriptor, test=test, version=version_str, ) # Check if the LAMBDA_ENVIRONMENT environment variable is set lambda_environment = os.getenv("LAMBDA_ENVIRONMENT") # If we just want to overwrite existing files, the don't bother checking if the version exists if overwrite: if lambda_environment: temp_dir = Path(tempfile.gettempdir()) # Set to temp directory output_path = temp_dir / base_filename else: output_path = Path(base_filename) # Return early return output_path # check if file already exists, if it exists set version to x.y.(max(z)+1) search_pattern = base_filename.replace(version_str, f"{version_str[0:-1]}*") if lambda_environment: # search for existing file in AWS for all files with x.y.z choose largest z and set to x.y.z+1 # Convert the glob pattern to regex pattern regex_pattern = search_pattern.replace(".", "\\.").replace("*", ".*") regex = re.compile(regex_pattern) # Search Lambda Environment for Files client = SWXSOCClient() try: results = client.search( AttrAnd( [ SearchTime(start=time, end=time), Instrument("meddea"), Level(level), Descriptor(descriptor), DevelopmentBucket(lambda_environment != "PRODUCTION"), ] ) ) # Find matches matching_files = [ Path(result["key"]).name for result in results if regex.match(Path(result["key"]).name) ] except Exception as e: log.error(f"Error Searching for Files in Lambda Environment: {e}") matching_files = [] # Check if there are any matching files, if so we need to increment. if len(matching_files) > 0: existing_versions = [ int(parse_science_filename(this_f)["version"].split(".")[-1]) for this_f in matching_files ] incremented_filename = create_science_filename( "meddea", time=time, level=level, descriptor=descriptor, test=test, version=f"{version_base}.{max(existing_versions) + 1}", ) else: incremented_filename = base_filename # Andrew insert code here temp_dir = Path(tempfile.gettempdir()) # Set to temp directory output_path = temp_dir / incremented_filename else: # Search if File Exists Locally if Path(base_filename).exists(): existing_files = Path.cwd().glob(search_pattern) existing_versions = [ int(parse_science_filename(this_f)["version"].split(".")[-1]) for this_f in existing_files ] incremented_filename = create_science_filename( "meddea", time=time, level=level, descriptor=descriptor, test=test, version=f"{version_base}.{max(existing_versions) + 1}", ) else: incremented_filename = base_filename # Return a Path with the local incremented Filename output_path = Path(incremented_filename) return output_path
[docs] def get_filename_version_base() -> str: """ Get the two most significant bits of the version number based on the current version of the software. Returns ------- str The base version string for the filename. For example, "1.0". """ version_mapping_path = ( padre_meddea._data_directory / "software_to_data_version_mapping.csv" ) # Read the version mapping CSV version_mapping = {} try: with open(version_mapping_path, "r") as f: csv_reader = csv.reader(f) for row in csv_reader: if not row: # Skip empty rows continue software_version = row[0] # First column value is the key data_version = row[1] # Rest of the row as values if software_version in version_mapping: version_mapping[software_version].append(data_version) else: version_mapping[software_version] = [data_version] except Exception as e: log.error(f"Error reading version mapping file: {e}") version_mapping = {} # Empty dictionary in case of error # Sort the Data Versions for each Software Version - Then we can get the latest version from Index 0 for key in version_mapping.keys(): version_mapping[key].sort( reverse=True ) # Sort in descending order to get the latest version first # Get the Latest Version (based on keys in the mapping) latest_version = max(version_mapping.keys()) # Get the two most significant bits of the current version number meddea_version = padre_meddea.__version__ current_version_key = ".".join( meddea_version.split(".")[:2] ) # Get the first two parts of the version if current_version_key in version_mapping: # Return the latest data version for the current software version return version_mapping[current_version_key][0] else: log.warning( f"No data version found for software version {current_version_key}. Defaulting to Latest Version. ({latest_version})" ) return version_mapping[latest_version][0]
[docs] def increment_filename_version(file_path: Path, version_index=0): """Given a filename, increment the version number by one. Parameter --------- version_index: int The version index to increment. Index 0 is least significant version. Returns ------- filename : str """ file_path = Path(file_path) tokens = parse_science_filename(file_path.name) version_tuple = [int(i) for i in tokens["version"].split(".")] version_tuple.reverse() version_tuple[version_index] += 1 version_str = f"{version_tuple[2]}.{version_tuple[1]}.{version_tuple[0]}" return file_path.with_name(file_path.name.replace(tokens["version"], version_str))
[docs] def calc_time(pkt_time_s, pkt_time_clk=0, ph_clk=0) -> Time: """ Convert times to a Time object """ deltat = TimeDelta( pkt_time_s * u.s + pkt_time_clk * 0.05 * u.microsecond + ph_clk * 12.8 * u.microsecond ) result = Time(EPOCH + deltat) return result
[docs] def has_baseline(filename: Path, packet_count=10) -> bool: """Given a stream of photon packets, check whether the baseline measurement is included. Baseline packets have one extra word per photon for a total of 4 words (8 bytes). This function calculates the number of hits in the packet assuming 4 words per photon. If the resultant is not an integer number then returns False. Parameters ---------- packet_bytes : byte string Photon packet bytes, must be an integer number of whole packets and greaterh Returns ------- result : bool """ HEADER_BYTES = 11 * 16 / 8 BYTES_PER_PHOTON = 16 * 4 / 8 with open(filename, "rb") as mixed_file: stream_by_apid = split_by_apid(mixed_file) if APID["photon"] in stream_by_apid.keys(): # only applicable to photon packets packet_stream = stream_by_apid[APID["photon"]] packet_bytes = split_packet_bytes(packet_stream) packet_count = min( len(packet_bytes), packet_count ) # in case we have fewer than packet_count in the file num_hits = np.zeros(packet_count) for i in range(packet_count): num_hits[i] = (len(packet_bytes[i]) - HEADER_BYTES) / BYTES_PER_PHOTON else: raise ValueError("Only works on photon packets.") # check if there is any remainder for non integer number of hits return np.sum(num_hits - np.floor(num_hits)) == 0
def str_to_fits_keyword(keyword: str) -> str: """Given a keyword string, return a fits compatible keyword string which must not include special characters and have fewer have no more than 8 characters.""" clean_keyword = "".join(e for e in keyword if e.isalnum()).strip().upper() return clean_keyword[0:8]
[docs] def is_consecutive(arr: np.array) -> bool: """Return True if the packet sequence numbers are all consecutive integers, has no missing numbers.""" MAX_SEQCOUNT = 2**14 - 1 # 16383 # Ensure arr is at least 1D arr = np.atleast_1d(arr) # check if seqcount has wrapped around indices = np.where(arr == MAX_SEQCOUNT) if len(indices[0]) == 0: # no wrap return np.all(np.diff(arr) == 1) else: last_index = 0 result = True for this_ind in indices[0]: this_arr = arr[last_index : this_ind + 1] result = result & np.all(np.diff(this_arr) == 1) last_index = this_ind + 1 # now do the remaining part of the array this_arr = arr[last_index + 1 :] result = result & np.all(np.diff(this_arr) == 1) return result
def trim_timeseries(ts: TimeSeries, t0=MIN_TIME_BAD) -> TimeSeries: """Remove all times in a time series before a given time. Parameters ---------- ts : Time The time before which all data should be removed. Returns ------- TimeSeries A TimeSeries containing the trimmed time series. """ inds = ts.time < t0 bad_count = np.sum(inds) if bad_count > 0: log.warning(f"Found {bad_count} bad times. Removing them.") return ts[~inds] def parse_ph_flags(ph_flags): """Given the photon flag field, parse into its individual components. The flags are stored as follows [15] Int.Time Overflow, [14:12] decimation level, [11:0] # dropped photons Returns ------- decim_lvl, dropped_count, int_time_overflow """ decim_lvl = (ph_flags >> 12) & 0b0111 dropped_count = ph_flags & 2047 int_time_overflow = ph_flags & 32768 return decim_lvl, dropped_count, int_time_overflow def threshold_to_energy(threshold_value: int) -> u.Quantity: """Given a threshold value return the threshold energy. If given a threshold of 63, the pixel is disabled. In that case, return the maximum detectable energy of 100 keV Parameters ---------- threshold_value : int The threshold value Returns ------- threshold_energy : u.Quantity The energy value of the threshold. """ MAX_ENERGY = 100 * u.keV max_threshold = 63 if (0 > threshold_value) or (threshold_value > max_threshold): raise ValueError("Threshold value should be between 0 and 63.") if threshold_value == 63: return MAX_ENERGY msb = 0.8 * u.keV lsb = 0.2 * u.keV lsb_array = np.arange(-3, 60, 1) lsb_array[lsb_array > 52] = 52 msb_array = np.zeros(max_threshold) msb_array[-8:] = np.arange(0, 8) threshold_energy = msb_array * msb + lsb_array * lsb return threshold_energy[threshold_value] def get_file_time(filename) -> Time: """Given filename return the time stamp.""" return Time(f"{filename[0:4]}-{filename[4:6]}-{filename[6:8]}T00:00")