Source code for padre_meddea.net.client

"""
SunPy compatible Fido client for searching and retrieving PADRE MeDDEA data.
"""

import urllib
from collections import OrderedDict
from datetime import timedelta
from html.parser import HTMLParser
from pathlib import Path
from typing import List
from urllib.parse import urljoin

from astropy.time import Time
from sunpy.net import attrs as a
from sunpy.net.attr import SimpleAttr
from sunpy.net.dataretriever import GenericClient, QueryResponse

from padre_meddea import log
from padre_meddea.util.util import parse_science_filename


[docs] class DataType(SimpleAttr): """ Attribute for specifying the data type for the search. Attributes ---------- value : str The data type value. """
[docs] class PADREClient(GenericClient): """ Data source for searching and fetching PADRE MeDDEA Data from SDAC File Servers. """ baseurl = "https://umbra.nascom.nasa.gov/"
[docs] @classmethod def register_values(cls): adict = { a.Provider: [("sdac", "The Solar Data Analysis Center.")], a.Source: [ ("padre", "(The Solar Polarization and Directivity X-Ray Experiment)") ], a.Instrument: [ ( "meddea", "Measuring Directivity to Determine Electron Anisotropy (MeDDEA)", ), ], DataType: [ ("spectrum", "Spectrum data from MeDDEA."), ("photon", "Photon data from MeDDEA."), ("housekeeping", "Housekeeping data from MeDDEA."), ], a.Level: [ ("raw", "Raw Binary CCSDS Packet data"), ("l0", "Raw data, converted to FITS, not in physical units."), ("l1", "Processed data, not in physical units."), ], } return adict
[docs] def search(self, *args, **kwargs) -> QueryResponse: """ Query this client for a list of results. Parameters ---------- \\*args: `tuple` `sunpy.net.attrs` objects representing the query. \\*\\*kwargs: `dict` Any extra keywords to refine the search. Returns ------- A `QueryResponse` instance containing the query result. """ matchdict = self._get_match_dict(*args, **kwargs) # Extract matchdict parameters instruments = matchdict.get("Instrument") levels = matchdict.get("Level") data_types = matchdict.get("DataType") start_time = matchdict.get("Start Time") end_time = matchdict.get("End Time") # Get search paths with data_type search_paths = self._get_search_paths( instruments, levels, data_types, start_time, end_time ) log.debug(f"Search paths: {search_paths}") # Search each path all_files = [] for path in search_paths: url = urljoin(self.baseurl, path) log.debug(f"Searching HTTP directory: {url}") files = self._crawl_directory(url) all_files.extend(files) # Template Replacement for DataType shortname_to_datatype = { "A0": "photon", "A2": "spectrum", "U8": "housekeeping", } # Process and return results metalist = [] for file_url in all_files: log.debug(f"Processing file URL: {file_url}") info = parse_science_filename(file_url) # Fix the DataType Information from the Raw file and filter Raw Files with wrong DataType if info.get("level") == "raw": for shortname, longname in shortname_to_datatype.items(): if shortname in file_url: info["descriptor"] = longname if info["descriptor"] not in data_types: continue # Skip files with wrong DataType # Extract filename and extension using Path path_obj = Path(file_url) filename = path_obj.name file_extension = path_obj.suffix rowdict = OrderedDict() rowdict["Instrument"] = info.get("instrument", "unknown") rowdict["Mode"] = info.get("mode", "unknown") rowdict["Test"] = info.get("test", False) rowdict["Time"] = info.get("time", "unknown") rowdict["Level"] = info.get("level", "unknown") rowdict["Version"] = info.get("version", "unknown") rowdict["Descriptor"] = info.get("descriptor", "unknown") rowdict["File Name"] = filename rowdict["File Extension"] = file_extension rowdict["url"] = file_url # Key metalist.append(rowdict) # pprint(f"Final metalist: {metalist}") return QueryResponse(metalist, client=self)
[docs] def _get_search_paths( self, instruments: List[str] = None, levels: List[str] = None, data_types: List[str] = None, start_time: Time = None, end_time: Time = None, ): """Generate HTTP paths to search based on query parameters.""" paths = [] # Mission Name mission = "padre" time_paths = self._generate_time_paths(start_time, end_time) # Combine all path components for instrument in instruments: for level in levels: if level == "raw": for time_path in time_paths: # For raw data, do not include data type in the path paths.append( f"{mission}/{mission}-{instrument}/{level}/{time_path}/" ) else: # For other levels, include data type in the path for data_type in data_types: for time_path in time_paths: # For other levels, include data type in the path paths.append( f"{mission}/{mission}-{instrument}/{level}/{data_type}/{time_path}/" ) return paths
[docs] @classmethod def _generate_time_paths(cls, start_time: Time, end_time: Time): """ Generate all year/month/day path components between start_time and end_time. Parameters ---------- start_time : astropy.time.Time Start time in ISO format (e.g., '2025-05-04') end_time : astropy.time.Time End time in ISO format (e.g., '2025-07-07') Returns ------- list List of path strings in format 'YYYY/MM/DD' """ # Parse the ISO format times start_date = start_time.datetime end_date = end_time.datetime # Initialize empty list for paths time_paths = [] # Iterate through each day in the range current_date = start_date while current_date <= end_date: # Format as YYYY/MM/DD path = ( f"{current_date.year}/{current_date.month:02d}/{current_date.day:02d}" ) time_paths.append(path) # Move to next day current_date += timedelta(days=1) log.debug( f"Generated {len(time_paths)} time paths from {start_time} to {end_time}" ) return time_paths
[docs] def _crawl_directory(self, url): """Directory crawler using only standard library.""" class LinkParser(HTMLParser): def __init__(self): super().__init__() self.links = [] def handle_starttag(self, tag, attrs): if tag == "a": for attr, value in attrs: if attr == "href": self.links.append(value) files = [] try: with urllib.request.urlopen(url) as response: html = response.read().decode("utf-8") parser = LinkParser() parser.feed(html) for href in parser.links: # Skip parent directory links and query parameters if not href or href.startswith("?") or href == "../": continue full_url = urljoin(url, href) # Don't crawl up: make sure we're still below our starting point if not full_url.startswith(self.baseurl) or len(full_url) < len( self.baseurl ): continue elif href.lower().endswith(".fits") or href.lower().endswith(".dat"): files.append(full_url) return files except Exception as e: log.debug(f"Error processing {url}: {e}") return []