# Licensed under a 3-clause BSD style license - see LICENSE.rst
import re
import os
import io
import requests
import warnings
import numpy as np
from astropy.table import Table
import astropy.io.fits as fits
from astroquery.exceptions import NoResultsWarning
__all__ = ['query', 'save_file', 'get_file']
id_parse = re.compile(r'ID\=(\d+)')
# should skip only if remote_data = False
__doctest_skip__ = ['query', 'save_file', 'get_file']
uri = 'https://sha.ipac.caltech.edu/applications/Spitzer/SHA/servlet/DataService?'
[docs]def query(coord=None, ra=None, dec=None, size=None, naifid=None, pid=None,
reqkey=None, dataset=2, verbosity=3, return_response=False,
return_payload=False):
"""
Query the Spitzer Heritage Archive (SHA).
Four query types are valid to search by position, NAIFID, PID, and ReqKey::
position -> search a region
naifid -> NAIF ID, which is a unique number allocated to solar
system objects (e.g. planets, asteroids, comets,
spacecraft) by the NAIF at JPL.
pid -> program ID
reqkey -> AOR ID: Astronomical Observation Request ID
For a valid query, enter only parameters related to a single query type::
position -> ra, dec, size
naifid -> naifid
pid -> pid
reqkey -> reqkey
Parameters
----------
coord : astropy.coordinates.builtin_systems
Astropy coordinate object. (query_type = 'position')
ra : number
Right ascension in degrees, alternative to using ``coord``.
(query_type = 'position')
dec : number
Declination in degrees, alternative to using ``coord``.
(query_type = 'position')
size : number
Region size in degrees. (query_type = 'position')
naifid : number
NAIF ID. (query_type = 'naifid')
pid : number
Program ID. (query_type = 'pid')
reqkey : number
Astronomical Observation Request ID. (query_type = 'reqkey')
dataset : number, default 2
Data set. Valid options::
1 -> BCD data
2 -> PBCD data
verbosity : number, default 3
Verbosity level, controls the number of columns to output.
Returns
-------
table : `~astropy.table.Table`
Examples
--------
Position query using an astropy coordinate object
>>> import astropy.coordinates as coord
>>> import astropy.units as u
>>> from astroquery import sha
>>> pos_t = sha.query(coord=coord.SkyCoord(ra=163.6136, dec=-11.784,
... unit=(u.degree, u.degree)), size=0.5)
Position query with optional ``ra`` and ``dec`` parameters
>>> pos_t = sha.query(ra=163.6136, dec=-11.784, size=0.5)
NAIFID query
>>> nid_t = sha.query(naifid=2003226)
PID query
>>> pid_t = sha.query(pid=30080)
ReqKey query
>>> rqk_t = sha.query(reqkey=21641216)
Notes
-----
For column descriptions, metadata, and other information visit the SHA
query API_ help page
.. _API: https://sha.ipac.caltech.edu/applications/Spitzer/SHA/help/doc/api.html
"""
# Use Coordinate instance if supplied
if coord is not None:
try:
ra = coord.transform_to('fk5').ra.degree
dec = coord.transform_to('fk5').dec.degree
except ValueError:
raise ValueError('Cannot parse `coord` variable.')
# Query parameters
payload = {'RA': ra,
'DEC': dec,
'SIZE': size,
'NAIFID': naifid,
'PID': pid,
'REQKEY': reqkey,
'VERB': verbosity,
'DATASET': 'ivo://irsa.ipac.spitzer.level{0}'.format(dataset)}
if return_payload:
return payload
# Make request
response = requests.get(uri, params=payload)
if return_response:
return response
response.raise_for_status()
# Parse output
# requests returns unicode strings, encode back to ascii
# because of '|foo|bar|' delimiters, remove first and last empty columns
raw_data = [line for line in response.text.split('\n')]
field_widths = [len(s) + 1 for s in raw_data[1].split('|')][1:-1]
col_names = [s.strip() for s in raw_data[1].split('|')][1:-1]
type_names = [s.strip() for s in raw_data[2].split('|')][1:-1]
cs = [0] + np.cumsum(field_widths).tolist()
def parse_line(line, cs=cs):
return [line[a:b] for a, b in zip(cs[:-1], cs[1:])]
data = [parse_line(row) for row in raw_data[4:-1]]
# Parse type names
dtypes = _map_dtypes(type_names, field_widths)
# To table
# transpose data for appropriate table instance handling
if len(data) > 0:
table = Table(list(zip(*data)), names=col_names, dtype=dtypes)
else:
warnings.warn(NoResultsWarning("No matching rows were found in the query."))
table = Table()
return table
[docs]def save_file(url, out_dir='sha_tmp/', out_name=None):
"""
Download image to output directory given a URL from a SHA query.
Parameters
----------
url : str
Access URL from SHA query. Requires complete URL, valid URLs from the
SHA query include columns::
accessUrl -> The URL to be used to retrieve an image or table
withAnc1 -> The URL to be used to retrieve the image or spectra
with important ancillary products (mask, uncertainty,
etc.) as a zip archive
out_dir : str
Path for output table or image
out_name : str
Name for output table or image, if None use the file ID as name.
Examples
--------
>>> from astroquery import sha
>>> url = sha.query(pid=30080)['accessUrl'][0]
>>> sha.save_file(url)
"""
exten_types = {'image/fits': '.fits',
'text/plain; charset=UTF-8': '.tbl',
'application/zip': '.zip',
}
# Make request
response = requests.get(url, stream=True)
response.raise_for_status()
# Name file using ID at end
if out_name is None:
out_name = 'shaID_' + id_parse.findall(url)[0]
# Determine extension
exten = exten_types[response.headers['Content-Type']]
# Check if path exists
if not os.path.exists(out_dir):
os.makedirs(out_dir)
# Write file
with open(out_dir + out_name + exten, 'wb') as f:
for block in response.iter_content(1024):
f.write(block)
[docs]def get_file(url):
"""
Return object from SHA query URL.
Currently only supports FITS files.
Parameters
----------
url : str
Access URL from SHA query. Requires complete URL, valid URLs from the
SHA query include columns::
accessUrl -> The URL to be used to retrieve an image or table
withAnc1 -> The URL to be used to retrieve the image or spectra
with important ancillary products (mask, uncertainty,
etc.) as a zip archive
Returns
-------
obj : `~astropy.table.Table`, `astropy.io.fits`, list
Return object depending if link points to a table, fits image, or zip
file of products.
Examples
--------
>>> from astroquery import sha
>>> url = sha.query(pid=30080)['accessUrl'][0]
>>> img = sha.get_file(url)
"""
# Make request
response = requests.get(url, stream=True)
response.raise_for_status()
# Read fits
iofile = io.BytesIO(response.content)
content_type = response.headers['Content-Type']
if content_type == 'image/fits':
obj = fits.open(iofile)
else:
raise Exception('Unknown content type: {0}.'.format(content_type))
return obj
def _map_dtypes(type_names, field_widths):
"""
Create dtype string based on column lengths and field type names.
Parameters
----------
type_names : list
List of type names from file header
field_widths : list
List of field width values
Returns
-------
dtypes : list
List of dtype for each column in data
"""
dtypes = []
for i, name in enumerate(type_names):
if name == 'int':
dtypes.append('i8')
elif name == 'double':
dtypes.append('f8')
elif name == 'char':
dtypes.append('a{0}'.format(field_widths[i]))
else:
raise ValueError('Unexpected type name: {0}.'.format(name))
return dtypes