# Licensed under a 3-clause BSD style license - see LICENSE.rst
import os.path
import keyring
import numpy as np
import re
import tarfile
import string
import requests
import warnings
from pkg_resources import resource_filename
from bs4 import BeautifulSoup
import pyvo
from urllib.parse import urljoin
from astropy.table import Table, Column, vstack
from astroquery import log
from astropy.utils import deprecated
from astropy.utils.console import ProgressBar
from astropy.utils.exceptions import AstropyDeprecationWarning
from astropy import units as u
from astropy.time import Time
from ..exceptions import LoginError
from ..utils import commons
from ..utils.process_asyncs import async_to_sync
from ..query import QueryWithLogin
from .tapsql import _gen_pos_sql, _gen_str_sql, _gen_numeric_sql,\
_gen_band_list_sql, _gen_datetime_sql, _gen_pol_sql, _gen_pub_sql,\
_gen_science_sql, _gen_spec_res_sql, ALMA_DATE_FORMAT
from . import conf, auth_urls
from astroquery.utils.commons import ASTROPY_LT_4_1
__all__ = {'AlmaClass', 'ALMA_BANDS'}
__doctest_skip__ = ['AlmaClass.*']
ALMA_TAP_PATH = 'tap'
ALMA_SIA_PATH = 'sia2'
ALMA_DATALINK_PATH = 'datalink/sync'
# Map from ALMA ObsCore result to ALMA original query result
# The map is provided in order to preserve the name of the columns in the
# original ALMA query original results and make it backwards compatible
# key - current column, value - original column name
_OBSCORE_TO_ALMARESULT = {
'proposal_id': 'Project code',
'target_name': 'Source name',
's_ra': 'RA',
's_dec': 'Dec',
'gal_longitude': 'Galactic longitude',
'gal_latitude': 'Galactic latitude',
'band_list': 'Band',
's_region': 'Footprint',
'em_resolution': 'Frequency resolution',
'antenna_arrays': 'Array',
'is_mosaic': 'Mosaic',
't_exptime': 'Integration',
'obs_release_date': 'Release date',
'frequency_support': 'Frequency support',
'velocity_resolution': 'Velocity resolution',
'pol_states': 'Pol products',
't_min': 'Observation date',
'obs_creator_name': 'PI name',
'schedblock_name': 'SB name',
'proposal_authors': 'Proposal authors',
'sensitivity_10kms': 'Line sensitivity (10 km/s)',
'cont_sensitivity_bandwidth': 'Continuum sensitivity',
'pwv': 'PWV',
'group_ous_uid': 'Group ous id',
'member_ous_uid': 'Member ous id',
'asdm_uid': 'Asdm uid',
'obs_title': 'Project title',
'type': 'Project type',
'scan_intent': 'Scan intent',
's_fov': 'Field of view',
'spatial_scale_max': 'Largest angular scale',
'qa2_passed': 'QA2 Status',
# TODO COUNT
'science_keyword': 'Science keyword',
'scientific_category': 'Scientific category'
}
ALMA_BANDS = {
'3': (84*u.GHz, 116*u.GHz),
'4': (125*u.GHz, 163*u.GHz),
'5': (163*u.GHz, 211*u.GHz),
'6': (211*u.GHz, 275*u.GHz),
'7': (275*u.GHz, 373*u.GHz),
'8': (385*u.GHz, 500*u.GHz),
'9': (602*u.GHz, 720*u.GHz),
'10': (787*u.GHz, 950*u.GHz)
}
ALMA_FORM_KEYS = {
'Position': {
'Source name (astropy Resolver)': ['source_name_resolver',
'SkyCoord.from_name', _gen_pos_sql],
'Source name (ALMA)': ['source_name_alma', 'target_name', _gen_str_sql],
'RA Dec (Sexagesimal)': ['ra_dec', 's_ra, s_dec', _gen_pos_sql],
'Galactic (Degrees)': ['galactic', 'gal_longitude, gal_latitude',
_gen_pos_sql],
'Angular resolution (arcsec)': ['spatial_resolution',
'spatial_resolution', _gen_numeric_sql],
'Largest angular scale (arcsec)': ['spatial_scale_max',
'spatial_scale_max', _gen_numeric_sql],
'Field of view (arcsec)': ['fov', 's_fov', _gen_numeric_sql]
},
'Energy': {
'Frequency (GHz)': ['frequency', 'frequency', _gen_numeric_sql],
'Bandwidth (Hz)': ['bandwidth', 'bandwidth', _gen_numeric_sql],
'Spectral resolution (KHz)': ['spectral_resolution',
'em_resolution', _gen_spec_res_sql],
'Band': ['band_list', 'band_list', _gen_band_list_sql]
},
'Time': {
'Observation date': ['start_date', 't_min', _gen_datetime_sql],
'Integration time (s)': ['integration_time', 't_exptime',
_gen_numeric_sql]
},
'Polarization': {
'Polarisation type (Single, Dual, Full)': ['polarisation_type',
'pol_states', _gen_pol_sql]
},
'Observation': {
'Line sensitivity (10 km/s) (mJy/beam)': ['line_sensitivity',
'sensitivity_10kms',
_gen_numeric_sql],
'Continuum sensitivity (mJy/beam)': ['continuum_sensitivity',
'cont_sensitivity_bandwidth',
_gen_numeric_sql],
'Water vapour (mm)': ['water_vapour', 'pvw', _gen_numeric_sql]
},
'Project': {
'Project code': ['project_code', 'proposal_id', _gen_str_sql],
'Project title': ['project_title', 'obs_title', _gen_str_sql],
'PI name': ['pi_name', 'obs_creator_name', _gen_str_sql],
'Proposal authors': ['proposal_authors', 'proposal_authors', _gen_str_sql],
'Project abstract': ['project_abstract', 'proposal_abstract', _gen_str_sql],
'Publication count': ['publication_count', 'NA', _gen_str_sql],
'Science keyword': ['science_keyword', 'science_keyword', _gen_str_sql]
},
'Publication': {
'Bibcode': ['bibcode', 'bib_reference', _gen_str_sql],
'Title': ['pub_title', 'pub_title', _gen_str_sql],
'First author': ['first_author', 'first_author', _gen_str_sql],
'Authors': ['authors', 'authors', _gen_str_sql],
'Abstract': ['pub_abstract', 'pub_abstract', _gen_str_sql],
'Year': ['publication_year', 'pub_year', _gen_numeric_sql]
},
'Options': {
'Public data only': ['public_data', 'data_rights', _gen_pub_sql],
'Science observations only': ['science_observation',
'science_observation', _gen_science_sql]
}
}
def _gen_sql(payload):
sql = 'select * from ivoa.obscore'
where = ''
if payload:
for constraint in payload:
for attrib_category in ALMA_FORM_KEYS.values():
for attrib in attrib_category.values():
if constraint in attrib:
# use the value and the second entry in attrib which
# is the new name of the column
val = payload[constraint]
if constraint == 'em_resolution':
# em_resolution does not require any transformation
attrib_where = _gen_numeric_sql(constraint, val)
else:
attrib_where = attrib[2](attrib[1], val)
if attrib_where:
if where:
where += ' AND '
else:
where = ' WHERE '
where += attrib_where
return sql + where
[docs]@async_to_sync
class AlmaClass(QueryWithLogin):
TIMEOUT = conf.timeout
archive_url = conf.archive_url
USERNAME = conf.username
def __init__(self):
# sia service does not need disambiguation but tap does
super(AlmaClass, self).__init__()
self._sia = None
self._tap = None
self._datalink = None
self.sia_url = None
self.tap_url = None
self.datalink_url = None
@property
def datalink(self):
if not self._datalink:
base_url = self._get_dataarchive_url()
if base_url.endswith('/'):
self.datalink_url = base_url + ALMA_DATALINK_PATH
else:
self.datalink_url = base_url + '/' + ALMA_DATALINK_PATH
self._datalink = pyvo.dal.adhoc.DatalinkService(
baseurl=self.datalink_url)
return self._datalink
@property
def sia(self):
if not self._sia:
base_url = self._get_dataarchive_url()
if base_url.endswith('/'):
self.sia_url = base_url + ALMA_SIA_PATH
else:
self.sia_url = base_url + '/' + ALMA_SIA_PATH
self._sia = pyvo.dal.sia2.SIAService(baseurl=self.sia_url)
return self._sia
@property
def tap(self):
if not self._tap:
base_url = self._get_dataarchive_url()
if base_url.endswith('/'):
self.tap_url = base_url + ALMA_TAP_PATH
else:
self.tap_url = base_url + '/' + ALMA_TAP_PATH
self._tap = pyvo.dal.tap.TAPService(baseurl=self.tap_url)
return self._tap
[docs] def query_object_async(self, object_name, cache=None, public=True,
science=True, payload=None, **kwargs):
"""
Query the archive for a source name.
Parameters
----------
object_name : str
The object name. Will be resolved by astropy.coord.SkyCoord
cache : deprecated
public : bool
True to return only public datasets, False to return private only,
None to return both
science : bool
True to return only science datasets, False to return only
calibration, None to return both
payload : dict
Dictionary of additional keywords. See `help`.
"""
if payload is not None:
payload['source_name_resolver'] = object_name
else:
payload = {'source_name_resolver': object_name}
return self.query_async(public=public, science=science,
payload=payload, **kwargs)
[docs] def query_region_async(self, coordinate, radius, cache=None, public=True,
science=True, payload=None, **kwargs):
"""
Query the ALMA archive with a source name and radius
Parameters
----------
coordinates : str / `astropy.coordinates`
the identifier or coordinates around which to query.
radius : str / `~astropy.units.Quantity`, optional
the radius of the region
cache : Deprecated
Cache the query?
public : bool
True to return only public datasets, False to return private only,
None to return both
science : bool
True to return only science datasets, False to return only
calibration, None to return both
payload : dict
Dictionary of additional keywords. See `help`.
"""
rad = radius
if not isinstance(radius, u.Quantity):
rad = radius*u.deg
obj_coord = commons.parse_coordinates(coordinate).icrs
ra_dec = '{}, {}'.format(obj_coord.to_string(), rad.to(u.deg).value)
if payload is None:
payload = {}
if 'ra_dec' in payload:
payload['ra_dec'] += ' | {}'.format(ra_dec)
else:
payload['ra_dec'] = ra_dec
return self.query_async(public=public, science=science,
payload=payload, **kwargs)
[docs] def query_async(self, payload, cache=None, public=True, science=True,
legacy_columns=False, max_retries=None,
get_html_version=None,
get_query_payload=None, **kwargs):
"""
Perform a generic query with user-specified payload
Parameters
----------
payload : dictionary
Please consult the `help` method
cache : deprecated
public : bool
True to return only public datasets, False to return private only,
None to return both
science : bool
True to return only science datasets, False to return only
calibration, None to return both
legacy_columns : bool
True to return the columns from the obsolete ALMA advanced query,
otherwise return the current columns based on ObsCore model.
Returns
-------
Table with results. Columns are those in the ALMA ObsCore model
(see ``help_tap``) unless ``legacy_columns`` argument is set to True.
"""
local_args = dict(locals().items())
for arg in local_args:
# check if the deprecated attributes have been used
for dep in ['cache', 'max_retries', 'get_html_version']:
if arg[0] == dep and arg[1] is not None:
warnings.warn(
("Argument '{}' has been deprecated "
"since version 4.0.1 and will be ignored".format(arg[0])),
AstropyDeprecationWarning)
del kwargs[arg[0]]
if payload is None:
payload = {}
for arg in kwargs:
value = kwargs[arg]
if 'band_list' == arg and isinstance(value, list):
value = ' '.join([str(_) for _ in value])
if arg in payload:
payload[arg] = '{} {}'.format(payload[arg], value)
else:
payload[arg] = value
if science is not None:
payload['science_observation'] = science
if public is not None:
payload['public_data'] = public
if get_query_payload:
return payload
query = _gen_sql(payload)
result = self.query_tap(query, maxrec=payload.get('maxrec', None))
if result is not None:
result = result.to_table()
else:
# Should not happen
raise RuntimeError('BUG: Unexpected result None')
if legacy_columns:
legacy_result = Table()
# add 'Observation date' column
for col_name in _OBSCORE_TO_ALMARESULT:
if col_name in result.columns:
if col_name == 't_min':
legacy_result['Observation date'] = \
[Time(_['t_min'], format='mjd').strftime(
ALMA_DATE_FORMAT) for _ in result]
else:
legacy_result[_OBSCORE_TO_ALMARESULT[col_name]] = \
result[col_name]
else:
log.error("Invalid column mapping in OBSCORE_TO_ALMARESULT: "
"{}:{}. Please "
"report this as an Issue."
.format(col_name, _OBSCORE_TO_ALMARESULT[col_name]))
return legacy_result
return result
[docs] def query_sia(self, pos=None, band=None, time=None, pol=None,
field_of_view=None, spatial_resolution=None,
spectral_resolving_power=None, exptime=None,
timeres=None, publisher_did=None,
facility=None, collection=None,
instrument=None, data_type=None,
calib_level=None, target_name=None,
res_format=None, maxrec=None,
**kwargs):
"""
Use standard SIA2 attributes to query the ALMA SIA service.
Parameters
----------
Returns
-------
Results in `pyvo.dal.SIAResults` format.
result.table in Astropy table format
"""
return self.sia.search(
pos=pos,
band=band,
time=time,
pol=pol,
field_of_view=field_of_view,
spatial_resolution=spatial_resolution,
spectral_resolving_power=spectral_resolving_power,
exptime=exptime,
timeres=timeres,
publisher_did=publisher_did,
facility=facility,
collection=collection,
instrument=instrument,
data_type=data_type,
calib_level=calib_level,
target_name=target_name,
res_format=res_format,
maxrec=maxrec,
**kwargs)
[docs] def query_tap(self, query, maxrec=None):
"""
Send query to the ALMA TAP. Results in pyvo.dal.TapResult format.
result.table in Astropy table format
Parameters
----------
maxrec : int
maximum number of records to return
"""
log.debug('TAP query: {}'.format(query))
return self.tap.search(query, language='ADQL', maxrec=maxrec)
[docs] def help_tap(self):
print('Table to query is "voa.ObsCore".')
print('For example: "select top 1 * from ivoa.ObsCore"')
print('The scheme of the table is as follows.\n')
print(' {0:20s} {1:15s} {2:10} {3}'.
format('Name', 'Type', 'Unit', 'Description'))
print('-'*90)
for tb in self.tap.tables.items():
if tb[0] == 'ivoa.obscore':
for col in tb[1].columns:
if col.datatype.content == 'char':
type = 'char({})'.format(col.datatype.arraysize)
else:
type = str(col.datatype.content)
unit = col.unit if col.unit else ''
print(' {0:20s} {1:15s} {2:10} {3}'.
format(col.name, type, unit, col.description))
# update method pydocs
query_region_async.__doc__ = query_region_async.__doc__.replace(
'_SIA2_PARAMETERS', pyvo.dal.sia2.SIA_PARAMETERS_DESC)
query_object_async.__doc__ = query_object_async.__doc__.replace(
'_SIA2_PARAMETERS', pyvo.dal.sia2.SIA_PARAMETERS_DESC)
query_async.__doc__ = query_async.__doc__.replace(
'_SIA2_PARAMETERS', pyvo.dal.sia2.SIA_PARAMETERS_DESC)
def _get_dataarchive_url(self):
"""
If the generic ALMA URL is used, query it to determine which mirror to
access for querying data
"""
if not hasattr(self, 'dataarchive_url'):
if self.archive_url in ('http://almascience.org', 'https://almascience.org'):
response = self._request('GET', self.archive_url,
cache=False)
response.raise_for_status()
# Jan 2017: we have to force https because the archive doesn't
# tell us it needs https.
self.dataarchive_url = response.url.replace(
"/asax/", "").replace("/aq/", "").replace("http://", "https://")
else:
self.dataarchive_url = self.archive_url
elif self.dataarchive_url in ('http://almascience.org',
'https://almascience.org'):
raise ValueError("'dataarchive_url' was set to a disambiguation "
"page that is meant to redirect to a real "
"archive. You should only reach this message "
"if you manually specified Alma.dataarchive_url. "
"If you did so, instead consider setting "
"Alma.archive_url. Otherwise, report an error "
"on github.")
return self.dataarchive_url
[docs] @deprecated(since="v0.4.1", alternative="get_data_info")
def stage_data(self, uids, expand_tarfiles=False, return_json=False):
"""
Obtain table of ALMA files
DEPRECATED: Data is no longer staged. This method is deprecated and
kept here for backwards compatibility reasons but it's not fully
compatible with the original implementation.
Parameters
----------
uids : list or str
A list of valid UIDs or a single UID.
UIDs should have the form: 'uid://A002/X391d0b/X7b'
expand_tarfiles : DEPRECATED
return_json : DEPRECATED
Note: The returned astropy table can be easily converted to json
through pandas:
output = StringIO()
stage_data(...).to_pandas().to_json(output)
table_json = output.getvalue()
Returns
-------
data_file_table : Table
A table containing 3 columns: the UID, the file URL (for future
downloading), and the file size
"""
if return_json:
raise AttributeError(
'return_json is deprecated. See method docs for a workaround')
table = Table()
res = self.get_data_info(uids, expand_tarfiles=expand_tarfiles)
p = re.compile(r'.*(uid__.*)\.asdm.*')
if res:
table['name'] = [u.split('/')[-1] for u in res['access_url']]
table['id'] = [p.search(x).group(1) if 'asdm' in x else 'None'
for x in table['name']]
table['type'] = res['content_type']
table['size'] = res['content_length']
table['permission'] = ['UNKNOWN'] * len(res)
table['mous_uid'] = [uids] * len(res)
table['URL'] = res['access_url']
table['isProprietary'] = res['readable']
return table
[docs] def get_data_info(self, uids, expand_tarfiles=False,
with_auxiliary=True, with_rawdata=True):
"""
Return information about the data associated with ALMA uid(s)
Parameters
----------
uids : list or str
A list of valid UIDs or a single UID.
UIDs should have the form: 'uid://A002/X391d0b/X7b'
expand_tarfiles : bool
False to return information on the tarfiles packages containing
the data or True to return information about individual files in
these packages
with_auxiliary : bool
True to include the auxiliary packages, False otherwise
with_rawdata : bool
True to include raw data, False otherwise
Returns
-------
Table with results or None. Table has the following columns: id (UID),
access_url (URL to access data), content_length, content_type (MIME
type), semantics, description (optional), error_message (optional)
"""
if uids is None:
raise AttributeError('UIDs required')
if isinstance(uids, (str, bytes)):
uids = [uids]
if not isinstance(uids, (list, tuple, np.ndarray)):
raise TypeError("Datasets must be given as a list of strings.")
# TODO remove this loop and send uids at once when pyvo fixed
result = None
for uid in uids:
res = self.datalink.run_sync(uid)
if res.status[0] != 'OK':
raise Exception('ERROR {}: {}'.format(res.status[0],
res.status[1]))
temp = res.to_table()
if ASTROPY_LT_4_1:
# very annoying
for col in [x for x in temp.colnames
if x not in ['content_length', 'readable']]:
temp[col] = temp[col].astype(str)
result = temp if result is None else vstack([result, temp])
to_delete = []
for index, rr in enumerate(result):
if rr['error_message'] is not None and \
rr['error_message'].strip():
log.warning('Error accessing info about file {}: {}'.
format(rr['access_url'], rr['error_message']))
# delete from results. Good thing to do?
to_delete.append(index)
result.remove_rows(to_delete)
if not with_auxiliary:
result = result[np.core.defchararray.find(
result['semantics'], '#aux') == -1]
if not with_rawdata:
result = result[np.core.defchararray.find(
result['semantics'], '#progenitor') == -1]
# primary data delivery type is files packaged in tarballs. However
# some type of data has an alternative way to retrieve each individual
# file as an alternative (semantics='#datalink' and
# 'content_type=application/x-votable+xml;content=datalink'). They also
# require an extra call to the datalink service to get the list of
# files.
DATALINK_FILE_TYPE = 'application/x-votable+xml;content=datalink'
DATALINK_SEMANTICS = '#datalink'
if expand_tarfiles:
# identify the tarballs that can be expandable and replace them
# with the list of components
expanded_result = None
to_delete = []
for index, row in enumerate(result):
if DATALINK_SEMANTICS in row['semantics'] and \
row['content_type'] == DATALINK_FILE_TYPE:
# subsequent call to datalink
file_id = row['access_url'].split('ID=')[1]
expanded_tar = self.get_data_info(file_id)
expanded_tar = expanded_tar[
expanded_tar['semantics'] != '#cutout']
if not expanded_result:
expanded_result = expanded_tar
else:
expanded_result = vstack(
[expanded_result, expanded_tar], join_type='exact')
to_delete.append(index)
# cleanup
result.remove_rows(to_delete)
# add the extra rows
if expanded_result:
result = vstack([result, expanded_result], join_type='exact')
else:
result = result[np.logical_or(np.core.defchararray.find(
result['semantics'].astype(str), DATALINK_SEMANTICS) == -1,
result['content_type'].astype(str) != DATALINK_FILE_TYPE)]
return result
[docs] def is_proprietary(self, uid):
"""
Given an ALMA UID, query the servers to determine whether it is
proprietary or not.
"""
query = "select distinct data_rights from ivoa.obscore where " \
"obs_id='{}'".format(uid)
result = self.query_tap(query)
if result:
tableresult = result.to_table()
if not result or len(tableresult) == 0:
raise AttributeError('{} not found'.format(uid))
if len(tableresult) == 1 and tableresult[0][0] == 'Public':
return False
return True
def _HEADER_data_size(self, files):
"""
Given a list of file URLs, return the data size. This is useful for
assessing how much data you might be downloading!
(This is discouraged by the ALMA archive, as it puts unnecessary load
on their system)
"""
totalsize = 0 * u.B
data_sizes = {}
pb = ProgressBar(len(files))
for index, fileLink in enumerate(files):
response = self._request('HEAD', fileLink, stream=False,
cache=False, timeout=self.TIMEOUT)
filesize = (int(response.headers['content-length']) * u.B).to(u.GB)
totalsize += filesize
data_sizes[fileLink] = filesize
log.debug("File {0}: size {1}".format(fileLink, filesize))
pb.update(index + 1)
response.raise_for_status()
return data_sizes, totalsize.to(u.GB)
[docs] def download_files(self, files, savedir=None, cache=True,
continuation=True, skip_unauthorized=True,):
"""
Given a list of file URLs, download them
Note: Given a list with repeated URLs, each will only be downloaded
once, so the return may have a different length than the input list
Parameters
----------
files : list
List of URLs to download
savedir : None or str
The directory to save to. Default is the cache location.
cache : bool
Cache the download?
continuation : bool
Attempt to continue where the download left off (if it was broken)
skip_unauthorized : bool
If you receive "unauthorized" responses for some of the download
requests, skip over them. If this is False, an exception will be
raised.
"""
if self.USERNAME:
auth = self._get_auth_info(self.USERNAME)
else:
auth = None
downloaded_files = []
if savedir is None:
savedir = self.cache_location
for file_link in unique(files):
log.debug("Downloading {0} to {1}".format(file_link, savedir))
try:
check_filename = self._request('HEAD', file_link, auth=auth)
check_filename.raise_for_status()
except requests.HTTPError as ex:
if ex.response.status_code == 401:
if skip_unauthorized:
log.info("Access denied to {url}. Skipping to"
" next file".format(url=file_link))
continue
else:
raise(ex)
try:
filename = re.search("filename=(.*)",
check_filename.headers['Content-Disposition']).groups()[0]
except KeyError:
log.info(f"Unable to find filename for {file_link} "
"(missing Content-Disposition in header). "
"Skipping to next file.")
continue
if savedir is not None:
filename = os.path.join(savedir,
filename)
try:
self._download_file(file_link,
filename,
timeout=self.TIMEOUT,
auth=auth,
cache=cache,
method='GET',
head_safe=False,
continuation=continuation)
downloaded_files.append(filename)
except requests.HTTPError as ex:
if ex.response.status_code == 401:
if skip_unauthorized:
log.info("Access denied to {url}. Skipping to"
" next file".format(url=file_link))
continue
else:
raise(ex)
elif ex.response.status_code == 403:
log.error("Access denied to {url}".format(url=file_link))
if 'dataPortal' in file_link and 'sso' not in file_link:
log.error("The URL may be incorrect. Try using "
"{0} instead of {1}"
.format(file_link.replace('dataPortal/',
'dataPortal/sso/'),
file_link))
raise ex
elif ex.response.status_code == 500:
# empirically, this works the second time most of the time...
self._download_file(file_link,
filename,
timeout=self.TIMEOUT,
auth=auth,
cache=cache,
method='GET',
head_safe=False,
continuation=continuation)
downloaded_files.append(filename)
else:
raise ex
return downloaded_files
def _parse_result(self, response, verbose=False):
"""
Parse a VOtable response
"""
if not verbose:
commons.suppress_vo_warnings()
return response
[docs] def retrieve_data_from_uid(self, uids, cache=True):
"""
Stage & Download ALMA data. Will print out the expected file size
before attempting the download.
Parameters
----------
uids : list or str
A list of valid UIDs or a single UID.
UIDs should have the form: 'uid://A002/X391d0b/X7b'
cache : bool
Whether to cache the downloads.
Returns
-------
downloaded_files : list
A list of the downloaded file paths
"""
if isinstance(uids, (str, bytes)):
uids = [uids]
if not isinstance(uids, (list, tuple, np.ndarray)):
raise TypeError("Datasets must be given as a list of strings.")
files = self.get_data_info(uids)
file_urls = files['access_url']
totalsize = files['content_length'].sum()*u.B
# each_size, totalsize = self.data_size(files)
log.info("Downloading files of size {0}...".format(totalsize.to(u.GB)))
# TODO: Add cache=cache keyword here. Currently would have no effect.
downloaded_files = self.download_files(file_urls)
return downloaded_files
def _get_auth_info(self, username, store_password=False,
reenter_password=False):
"""
Get the auth info (user, password) for use in another function
"""
if username is None:
if not self.USERNAME:
raise LoginError("If you do not pass a username to login(), "
"you should configure a default one!")
else:
username = self.USERNAME
if hasattr(self, '_auth_url'):
auth_url = self._auth_url
else:
raise LoginError("Login with .login() to acquire the appropriate"
" login URL")
# Get password from keyring or prompt
password, password_from_keyring = self._get_password(
"astroquery:{0}".format(auth_url), username, reenter=reenter_password)
# When authenticated, save password in keyring if needed
if password_from_keyring is None and store_password:
keyring.set_password("astroquery:{0}".format(auth_url), username, password)
return username, password
def _login(self, username=None, store_password=False,
reenter_password=False, auth_urls=auth_urls):
"""
Login to the ALMA Science Portal.
Parameters
----------
username : str, optional
Username to the ALMA Science Portal. If not given, it should be
specified in the config file.
store_password : bool, optional
Stores the password securely in your keyring. Default is False.
reenter_password : bool, optional
Asks for the password even if it is already stored in the
keyring. This is the way to overwrite an already stored passwork
on the keyring. Default is False.
"""
success = False
for auth_url in auth_urls:
# set session cookies (they do not get set otherwise)
cookiesetpage = self._request("GET",
urljoin(self._get_dataarchive_url(),
'rh/forceAuthentication'),
cache=False)
self._login_cookiepage = cookiesetpage
cookiesetpage.raise_for_status()
if (auth_url+'/cas/login' in cookiesetpage.request.url):
# we've hit a target, we're good
success = True
break
if not success:
raise LoginError("Could not log in to any of the known ALMA "
"authorization portals: {0}".format(auth_urls))
# Check if already logged in
loginpage = self._request("GET", "https://{auth_url}/cas/login".format(auth_url=auth_url),
cache=False)
root = BeautifulSoup(loginpage.content, 'html5lib')
if root.find('div', class_='success'):
log.info("Already logged in.")
return True
self._auth_url = auth_url
username, password = self._get_auth_info(username=username,
store_password=store_password,
reenter_password=reenter_password)
# Authenticate
log.info("Authenticating {0} on {1} ...".format(username, auth_url))
# Do not cache pieces of the login process
data = {kw: root.find('input', {'name': kw})['value']
for kw in ('execution', '_eventId')}
data['username'] = username
data['password'] = password
data['submit'] = 'LOGIN'
login_response = self._request("POST", "https://{0}/cas/login".format(auth_url),
params={'service': self._get_dataarchive_url()},
data=data,
cache=False)
# save the login response for debugging purposes
self._login_response = login_response
# do not expose password back to user
del data['password']
# but save the parameters for debug purposes
self._login_parameters = data
authenticated = ('You have successfully logged in' in
login_response.text)
if authenticated:
log.info("Authentication successful!")
self.USERNAME = username
else:
log.exception("Authentication failed!")
return authenticated
[docs] def get_cycle0_uid_contents(self, uid):
"""
List the file contents of a UID from Cycle 0. Will raise an error
if the UID is from cycle 1+, since those data have been released in
a different and more consistent format. See
http://almascience.org/documents-and-tools/cycle-2/ALMAQA2Productsv1.01.pdf
for details.
"""
# First, check if UID is in the Cycle 0 listing
if uid in self.cycle0_table['uid']:
cycle0id = self.cycle0_table[
self.cycle0_table['uid'] == uid][0]['ID']
contents = [row['Files']
for row in self._cycle0_tarfile_content
if cycle0id in row['ID']]
return contents
else:
info_url = urljoin(
self._get_dataarchive_url(),
'documents-and-tools/cycle-2/ALMAQA2Productsv1.01.pdf')
raise ValueError("Not a Cycle 0 UID. See {0} for details about "
"cycle 1+ data release formats.".format(info_url))
@property
def _cycle0_tarfile_content(self):
"""
In principle, this is a static file, but we'll retrieve it just in case
"""
if not hasattr(self, '_cycle0_tarfile_content_table'):
url = urljoin(self._get_dataarchive_url(),
'alma-data/archive/cycle-0-tarfile-content')
response = self._request('GET', url, cache=True)
# html.parser is needed because some <tr>'s have form:
# <tr width="blah"> which the default parser does not pick up
root = BeautifulSoup(response.content, 'html.parser')
html_table = root.find('table', class_='grid listing')
data = list(zip(*[(x.findAll('td')[0].text,
x.findAll('td')[1].text)
for x in html_table.findAll('tr')]))
columns = [Column(data=data[0], name='ID'),
Column(data=data[1], name='Files')]
tbl = Table(columns)
assert len(tbl) == 8497
self._cycle0_tarfile_content_table = tbl
else:
tbl = self._cycle0_tarfile_content_table
return tbl
@property
def cycle0_table(self):
"""
Return a table of Cycle 0 Project IDs and associated UIDs.
The table is distributed with astroquery and was provided by Felix
Stoehr.
"""
if not hasattr(self, '_cycle0_table'):
filename = resource_filename(
'astroquery.alma', 'data/cycle0_delivery_asdm_mapping.txt')
self._cycle0_table = Table.read(filename, format='ascii.no_header')
self._cycle0_table.rename_column('col1', 'ID')
self._cycle0_table.rename_column('col2', 'uid')
return self._cycle0_table
[docs] def get_files_from_tarballs(self, downloaded_files, regex=r'.*\.fits$',
path='cache_path', verbose=True):
"""
Given a list of successfully downloaded tarballs, extract files
with names matching a specified regular expression. The default
is to extract all FITS files
NOTE: alma now supports direct listing and downloads of tarballs. See
``get_data_info`` and ``download_and_extract_files``
Parameters
----------
downloaded_files : list
A list of downloaded files. These should be paths on your local
machine.
regex : str
A valid regular expression
path : 'cache_path' or str
If 'cache_path', will use the astroquery.Alma cache directory
(``Alma.cache_location``), otherwise will use the specified path.
Note that the subdirectory structure of the tarball will be
maintained.
Returns
-------
filelist : list
A list of the extracted file locations on disk
"""
if path == 'cache_path':
path = self.cache_location
elif not os.path.isdir(path):
raise OSError("Specified an invalid path {0}.".format(path))
fitsre = re.compile(regex)
filelist = []
for fn in downloaded_files:
tf = tarfile.open(fn)
for member in tf.getmembers():
if fitsre.match(member.name):
if verbose:
log.info("Extracting {0} to {1}".format(member.name,
path))
tf.extract(member, path)
filelist.append(os.path.join(path, member.name))
return filelist
[docs] def download_and_extract_files(self, urls, delete=True, regex=r'.*\.fits$',
include_asdm=False, path='cache_path',
verbose=True):
"""
Given a list of tarball URLs, it extracts all the FITS files (or
whatever matches the regex)
Parameters
----------
urls : str or list
A single URL or a list of URLs
include_asdm : bool
Only affects cycle 1+ data. If set, the ASDM files will be
downloaded in addition to the script and log files. By default,
though, this file will be downloaded and deleted without extracting
any information: you must change the regex if you want to extract
data from an ASDM tarball
"""
if isinstance(urls, str):
urls = [urls]
if not isinstance(urls, (list, tuple, np.ndarray)):
raise TypeError("Datasets must be given as a list of strings.")
filere = re.compile(regex)
all_files = []
tar_files = []
expanded_files = []
for url in urls:
if url[-4:] != '.tar':
raise ValueError("URLs should be links to tarballs.")
tarfile_name = os.path.split(url)[-1]
if tarfile_name in self._cycle0_tarfile_content['ID']:
# It is a cycle 0 file: need to check if it contains FITS
match = (self._cycle0_tarfile_content['ID'] == tarfile_name)
if not any(re.match(regex, x) for x in
self._cycle0_tarfile_content['Files'][match]):
log.info("No FITS files found in {0}".format(tarfile_name))
continue
else:
if 'asdm' in tarfile_name and not include_asdm:
log.info("ASDM tarballs do not contain FITS files; "
"skipping.")
continue
tar_file = url.split('/')[-1]
files = self.get_data_info(tar_file)
if files:
expanded_files += [x for x in files['access_url'] if
filere.match(x.split('/')[-1])]
else:
tar_files.append(url)
try:
# get the tar files
downloaded = self.download_files(tar_files, savedir=path)
fitsfilelist = self.get_files_from_tarballs(downloaded,
regex=regex, path=path,
verbose=verbose)
if delete:
for tarball_name in downloaded:
log.info("Deleting {0}".format(tarball_name))
os.remove(tarball_name)
all_files += fitsfilelist
# download the other files
all_files += self.download_files(expanded_files, savedir=path)
except requests.ConnectionError as ex:
self.partial_file_list = all_files
log.error("There was an error downloading the file. "
"A partially completed download list is "
"in Alma.partial_file_list")
raise ex
except requests.HTTPError as ex:
if ex.response.status_code == 401:
log.info("Access denied to {url}. Skipping to"
" next file".format(url=url))
else:
raise ex
return all_files
[docs] def help(self, cache=True):
"""
Return the valid query parameters
"""
print("\nMost common ALMA query keywords are listed below. These "
"keywords are part of the ALMA ObsCore model, an IVOA standard "
"for metadata representation (3rd column). They were also "
"present in original ALMA Web form and, for backwards "
"compatibility can be accessed with their old names (2nd "
"column).\n"
"More elaborate queries on the ObsCore model "
"are possible with `query_sia` or `query_tap` methods")
print(" {0:33s} {1:35s} {2:35s}".format("Description",
"Original ALMA keyword",
"ObsCore keyword"))
print("-"*103)
for title, section in ALMA_FORM_KEYS.items():
print()
print(title)
for row in section.items():
print(" {0:33s} {1:35s} {2:35s}".format(row[0], row[1][0], row[1][1]))
print('\nExamples of queries:')
print("Alma.query('proposal_id':'2011.0.00131.S'}")
print("Alma.query({'band_list': ['5', '7']}")
print("Alma.query({'source_name_alma': 'GRB021004'})")
print("Alma.query(payload=dict(project_code='2017.1.01355.L', "
"source_name_alma='G008.67'))")
def _json_summary_to_table(self, data, base_url):
"""
Special tool to convert some JSON metadata to a table Obsolete as of
March 2020 - should be removed along with stage_data_prefeb2020
"""
from ..utils import url_helpers
columns = {'mous_uid': [], 'URL': [], 'size': []}
for entry in data['node_data']:
# de_type can be useful (e.g., MOUS), but it is not necessarily
# specified
# file_name and file_key *must* be specified.
is_file = \
(entry['file_name'] != 'null' and entry['file_key'] != 'null')
if is_file:
# "de_name": "ALMA+uid://A001/X122/X35e",
columns['mous_uid'].append(entry['de_name'][5:])
if entry['file_size'] == 'null':
columns['size'].append(np.nan * u.Gbyte)
else:
columns['size'].append(
(int(entry['file_size']) * u.B).to(u.Gbyte))
# example template for constructing url:
# https://almascience.eso.org/dataPortal/requests/keflavich/940238268/ALMA/
# uid___A002_X9d6f4c_X154/2013.1.00546.S_uid___A002_X9d6f4c_X154.asdm.sdm.tar
# above is WRONG... except for ASDMs, when it's right
# should be:
# 2013.1.00546.S_uid___A002_X9d6f4c_X154.asdm.sdm.tar/2013.1.00546.S_uid___A002_X9d6f4c_X154.asdm.sdm.tar
#
# apparently ASDMs are different from others:
# templates:
# https://almascience.eso.org/dataPortal/requests/keflavich/946895898/ALMA/
# 2013.1.00308.S_uid___A001_X196_X93_001_of_001.tar/2013.1.00308.S_uid___A001_X196_X93_001_of_001.tar
# uid___A002_X9ee74a_X26f0/2013.1.00308.S_uid___A002_X9ee74a_X26f0.asdm.sdm.tar
url = url_helpers.join(base_url,
entry['file_key'],
entry['file_name'])
if 'null' in url:
raise ValueError("The URL {0} was created containing "
"'null', which is invalid.".format(url))
columns['URL'].append(url)
columns['size'] = u.Quantity(columns['size'], u.Gbyte)
tbl = Table([Column(name=k, data=v) for k, v in columns.items()])
return tbl
Alma = AlmaClass()
def clean_uid(uid):
"""
Return a uid with all unacceptable characters replaced with underscores
"""
if not hasattr(uid, 'replace'):
return clean_uid(str(uid.astype('S')))
try:
return uid.decode('utf-8').replace(u"/", u"_").replace(u":", u"_")
except AttributeError:
return uid.replace("/", "_").replace(":", "_")
def reform_uid(uid):
"""
Convert a uid with underscores to the original format
"""
return uid[:3] + "://" + "/".join(uid[6:].split("_"))
def unique(seq):
"""
Return unique elements of a list, preserving order
"""
seen = set()
seen_add = seen.add
return [x for x in seq if not (x in seen or seen_add(x))]
def filter_printable(s):
""" extract printable characters from a string """
return filter(lambda x: x in string.printable, s)
def uid_json_to_table(jdata,
productlist=['ASDM', 'PIPELINE_PRODUCT',
'PIPELINE_PRODUCT_TARFILE',
'PIPELINE_AUXILIARY_TARFILE']):
rows = []
def flatten_jdata(this_jdata, mousID=None):
if isinstance(this_jdata, list):
for item in this_jdata:
if item['type'] in productlist:
item['mous_uid'] = mousID
rows.append(item)
elif len(item['children']) > 0:
if len(item['allMousUids']) == 1:
flatten_jdata(item['children'], item['allMousUids'][0])
else:
flatten_jdata(item['children'])
flatten_jdata(jdata['children'])
keys = rows[-1].keys()
columns = [Column(data=[row[key] for row in rows], name=key)
for key in keys if key not in ('children', 'allMousUids')]
columns = [col.astype(str) if col.dtype.name == 'object' else col for col
in columns]
return Table(columns)