Source code for mlso.api.client

# -*- coding: utf-8 -*-

"""Module defining the Python and command line client.
"""

import argparse
import datetime
import logging
import math
import os
from pathlib import Path
from pprint import pformat
import sys
import textwrap

import requests
import tqdm


from . import __version__

BASE_URL = "http://api.mlso.ucar.edu:5000"
API_VERSION = "v1"
SIGNUP_URL = "https://registration.hao.ucar.edu"


# chunk size for downloading files, 1-10M is probably the most efficient size
# for good speed and reasonable memory usage
CHUNK_SIZE = 1024 * 1024

session = requests.Session()

# setup default logging, users of this library can set this to their own logger
# or modify this one to suit their needs
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)


# API


[docs] class UserNotFound(Exception): """Exception raised when a username is found in the registration database.""" pass
[docs] class ServerError(Exception): """Exception raised when there is a problem with the server not returning a valid result. """ pass
[docs] def about( base_url: str = BASE_URL, api_version: str = API_VERSION, verbose: bool = False ): """Retrieve the results of the `/about` endpoint.""" url = f"{base_url}/{api_version}/about" if verbose: logger.debug(f"URL: {url}") try: r = requests.get(url) except requests.exceptions.ConnectionError as e: raise ServerError(f"Connection error reaching {url}") if not r.ok: j = r.json() msg = j["message"] raise ServerError(f"Server response: {r.status_code} {r.reason} ({msg})") j = r.json() if verbose: logger.debug(pformat(j)) return j
[docs] def instruments( base_url: str = BASE_URL, api_version: str = API_VERSION, verbose: bool = False ): """Retrieve list of instruments from the `/instruments/{instrument}` endpoint with some of their properties. """ url = f"{base_url}/{api_version}/instruments" if verbose: logger.debug(f"URL: {url}") try: r = requests.get(url) except requests.exceptions.ConnectionError as e: raise ServerError(f"Connection error reaching {url}") if not r.ok: raise ServerError(f"Server response: {r.status_code} {r.reason}") instruments = r.json() if verbose: logger.debug(pformat(instruments)) instruments = sorted(instruments) results = [] for instrument in instruments: url = f"{base_url}/{api_version}/instruments/{instrument}/" if verbose: logger.debug(f"URL: {url}") try: r = requests.get(url) except requests.exceptions.ConnectionError as e: raise ServerError(f"Connection error reaching {url}") j = r.json() if verbose: logger.debug(pformat(j)) dates = j["dates"] full_name = j["name"] start_date = dates["start-date"] end_date = dates["end-date"] i = { "id": instrument, "start-date": start_date, "end-date": end_date, "name": j["name"], } results.append(i) return results
[docs] def products( instrument, base_url: str = BASE_URL, api_version: str = API_VERSION, verbose: bool = False, ): """Handle retrieving the `/instruments/{instrument}/products` endpoint results. Can raise ServerError if there is a problem with the web request. """ url = f"{base_url}/{api_version}/instruments/{instrument}/products" if verbose: logger.debug(f"URL: {url}") try: r = requests.get(url) except requests.exceptions.ConnectionError as e: raise ServerError(f"Connection error reaching {url}") if not r.ok: j = r.json() msg = j["message"] raise ServerError(f"Server response: {r.status_code} {r.reason} ({msg})") j = r.json() if verbose: logger.debug(pformat(j)) return j
[docs] def authenticate( username: str = None, base_url: str = BASE_URL, api_version: str = API_VERSION, verbose: bool = False, ): """Authenticate username within the session. This must be called before `download_file`. """ if username is None: msg = f"username required, sign up at {SIGNUP_URL}" raise UserNotFound(msg) else: url = f"{base_url}/{api_version}/authenticate?username={username}" if verbose: logger.debug(pformat(f"URL: {url}")) try: r = session.get(url) except requests.exceptions.ConnectionError as e: raise ServerError(f"Connection error reaching {url}") if not r.ok: if r.status_code == 404: info = r.json() if verbose: logger.debug(pformat(info)) raise UserNotFound(info["message"]) else: j = r.json() msg = j["message"] raise ServerError( f"Server response: {r.status_code} {r.reason} ({msg})" )
[docs] def download_file(file, output_dir): """Download a single file to the given output directory. The `file` argument is a dict with fields "url" and "filename". `output_dir` is simply the directory to put the downloaded file. Can raise ServerError if there is a problem with the web request. """ url = file["url"] try: r = session.get(url, stream=True, cookies=session.cookies.get_dict()) except requests.exceptions.ConnectionError as e: raise ServerError(f"Connection error reaching {url}") if not r.ok: raise ServerError(f"Server response: {r.status_code} {r.reason}") path = Path(output_dir) / file["filename"] with open(path, "wb") as handle: for data in r.iter_content(chunk_size=CHUNK_SIZE): handle.write(data) return path
[docs] def files( instrument: str, product: str, filters: list[dict[str, str]] | None = None, base_url: str = BASE_URL, api_version: str = API_VERSION, verbose=False, ): """Handle retrieving the `/instruments/{instrument}/products/{product}` endpoint results. Can raise ServerError if there is a problem with the web request. Use `download_file` to download the file(s) returned with routine. """ url = f"{base_url}/{api_version}/instruments/{instrument}/products/{product}" if len(filters) > 0: url += "?" + "&".join([f"{f}={filters[f]}" for f in filters]) if verbose: logger.debug(f"URL: {url}") try: r = requests.get(url) except requests.exceptions.ConnectionError as e: raise ServerError(f"Connection error reaching {url}") if not r.ok: j = r.json() msg = j["message"] raise ServerError(f"Server response: {r.status_code} {r.reason} ({msg})") j = r.json() if verbose: logger.debug(pformat(j)) return j
# Command line interface sub-command handlers def _about(args): """Handle printing the `/about` endpoint results for the command line interface. """ try: about_response = about(args.base_url, verbose=args.verbose) server_version = about_response["version"] documentation_url = about_response["documentation"] print( f"server version: {server_version}, client version: {__version__}" ) print(f"documentation: {documentation_url}") except ServerError as e: print(e) def _instruments(args): """Handle printing the `/instruments` endpoint results for the command line interface. """ try: instruments_response = instruments(args.base_url, verbose=args.verbose) except ServerError as e: print(e) return print(f"{'ID':8s} {'Instrument name':44s} Dates available") print(f"{'-' * 8} {'-' * 44} {'-' * 23}") for i in instruments_response: instrument = i["id"] instrument_name = i["name"] start_date = i["start-date"][:10] end_date = i["end-date"][:10] print(f"{instrument:8s} {instrument_name:44s} {start_date}...{end_date}") def _products(args): """Handle printing the `/instruments/{instrument}/products` endpoint results for the command line interface. """ try: products_response = products( args.instrument, base_url=args.base_url, verbose=args.verbose ) except ServerError as e: print(e) return print(f"{'ID':13s} {'Title':22s} {'Description'}") print(f"{'-' * 13} {'-' * 22} {'-' * 55}") for p in products_response["products"]: title = p["title"] product_id = p["id"] description = textwrap.wrap(p["description"], width=55) if len(description) == 0: description = [""] for description_line in description: print(f"{product_id:13s} {title:22s} {description_line}") product_id = "" title = "" name = "" def _download_files( base_url: str, filelist: list, output_dir: Path, username: str, verbose: bool = False, quiet: bool = False, ): """Download the given files to an output directory. The `files` argument is a list of dicts with fields "url" and "filename". """ if not output_dir.is_dir(): if verbose: print(f"creating output path {output_dir}") os.makedirs(output_dir) try: authenticate(username, base_url=base_url, verbose=verbose) except UserNotFound as e: print(e) sys.exit(1) except ServerError as e: print(e) sys.exit(1) if quiet: iterable_files = filelist message = print else: iterable_files = tqdm.tqdm(filelist) message = tqdm.tqdm.write n_failed = 0 for f in iterable_files: try: filepath = download_file(f, output_dir) except ServerError as e: message(f"{f['url']} failed") n_failed += 1 if n_failed > 0: print(f"{n_failed} failed downloads") unit_list = list(zip(["B", "KB", "MB", "GB", "TB", "PB"], [0, 0, 1, 2, 2, 2]))
[docs] def sizeof_fmt(n_bytes: int) -> str: """Human friendly file size""" if n_bytes == 0: return "0 B" if n_bytes >= 1: exponent = min(int(math.log(n_bytes, 1024)), len(unit_list) - 1) quotient = float(n_bytes) / 1024**exponent unit, num_decimals = unit_list[exponent] format_string = "{:.%sf} {}" % (num_decimals) return format_string.format(quotient, unit)
def _files(args): """Handle printing the `/instruments/{instrument}/products/{product}` endpoint results, optionally downloading the files. """ filters = {} if args.wave_region is not None: filters["wave-region"] = args.wave_region if args.obs_plan is not None: filters["obs-plan"] = args.obs_plan if args.start_date is not None: filters["start-date"] = args.start_date if args.end_date is not None: filters["end-date"] = args.end_date if args.carrington_rotation is not None: filters["cr"] = args.carrington_rotation if args.every is not None: filters["every"] = args.every try: files_response = files( args.instrument, args.product, filters, base_url=args.base_url, verbose=args.verbose, ) except ServerError as e: print(e) return if args.verbose: instrument = files_response["instrument"] product = files_response["product"] start_date = files_response["start-date"] end_date = files_response["end-date"] filesize = sizeof_fmt(files_response["total_filesize"]) print(f"Instrument : {instrument}") print(f"Product : {product}") print(f"Start date : {start_date}") print(f"End date : {end_date}") print(f"Filesize : {filesize}") filelist = files_response["files"] if args.download: _download_files( args.base_url, filelist, Path(args.output_dir), args.username, verbose=args.verbose, quiet=args.quiet, ) else: if len(filelist) > 0: if args.verbose: print() max_filename_len = max([len(f["filename"]) for f in filelist]) print( f"{'Date/time':20s} {'Instrument':10s} {'Product':13s} {'Filesize':10s} {'Filename'}" ) print( f"{'-' * 20} {'-' * 10} {'-' * 13} {'-' * 10} {'-' * max_filename_len}" ) total_filesize = 0 for f in filelist: total_filesize += f["filesize"] print( f"{f['date-obs']:20s} {f['instrument']:10s} {f['product']:13s} {sizeof_fmt(f['filesize']):>10s} {f['filename']}" ) if len(filelist) > 1: print( f"{'-' * 20} {'-' * 10} {'-' * 13} {'-' * 10} {'-' * max_filename_len}" ) n_files = f"{len(filelist)} files" print(f"{n_files:45s} {sizeof_fmt(total_filesize):>10s} {''}") def _print_help(args): """Print the usage help for the command-line utility.""" args.parser.print_help()
[docs] def main(): """Entry point for MLSO API command-line utility.""" name = f"MLSO API command line interface (mlso-api-client {__version__})" parser = argparse.ArgumentParser(description=name) parser.add_argument("-v", "--version", action="version", version=name) # show help if no sub-command given parser.set_defaults(func=_print_help, parser=parser) parser.add_argument( "-u", "--base-url", help="base URL for MLSO API", default=BASE_URL ) parser.add_argument("--verbose", help="output warnings", action="store_true") parser.add_argument( "-q", "--quiet", help="surpress informational messages", action="store_true" ) subparsers = parser.add_subparsers(help="sub-command help") instruments_parser = subparsers.add_parser("instruments", help="MLSO instruments") instruments_parser.set_defaults(func=_instruments, parser=instruments_parser) products_parser = subparsers.add_parser("products", help="MLSO instruments") products_parser.add_argument("-i", "--instrument", help="instrument", default=None) products_parser.set_defaults(func=_products, parser=products_parser) files_parser = subparsers.add_parser("files", help="MLSO data files") files_parser.add_argument("-i", "--instrument", help="instrument", default=None) files_parser.add_argument("-p", "--product", help="product", default=None) files_parser.add_argument( "--wave-region", help="wave region, e.g., 1074, 1079, etc.", default=None ) files_parser.add_argument( "--obs-plan", help="observing plan: synoptic or waves", default=None ) files_parser.add_argument("-s", "--start-date", help="start date", default=None) files_parser.add_argument("-e", "--end-date", help="end date", default=None) files_parser.add_argument( "-c", "--carrington-rotation", help="Carrington Rotation number", default=None ) files_parser.add_argument( "--every", help="time to choose 1 file from", default=None ) files_parser.add_argument( "-d", "--download", help="download the displayed files", action="store_true" ) files_parser.add_argument( "-u", "--username", help="email already registered at HAO website", default=None ) files_parser.add_argument( "-o", "--output-dir", help="output directory for downloaded files", default="." ) files_parser.set_defaults(func=_files, parser=files_parser) # parse args and call appropriate sub-command args = parser.parse_args() if args.verbose: _about(args) print() if parser.get_default("func"): try: args.func(args) except KeyboardInterrupt: print() else: parser.print_help()
if __name__ == "__main__": main()