# Licensed under a 3-clause BSD style license - see LICENSE.rst
import time
import sys
import os.path
import shutil
import webbrowser
import warnings
import keyring
import numpy as np
import re
from bs4 import BeautifulSoup
from io import BytesIO
from astropy.table import Table, Column
from astroquery import log
from ..exceptions import LoginError, RemoteServiceError, NoResultsWarning
from ..utils import schema, system_tools
from ..query import QueryWithLogin, suspend_cache
from . import conf
__doctest_skip__ = ['EsoClass.*']
def _check_response(content):
"""
Check the response from an ESO service query for various types of error
If all is OK, return True
"""
if b"NETWORKPROBLEM" in content:
raise RemoteServiceError("The query resulted in a network "
"problem; the service may be offline.")
elif b"# No data returned !" not in content:
return True
[docs]class EsoClass(QueryWithLogin):
ROW_LIMIT = conf.row_limit
USERNAME = conf.username
QUERY_INSTRUMENT_URL = conf.query_instrument_url
def __init__(self):
super(EsoClass, self).__init__()
self._instrument_list = None
self._survey_list = None
self.username = None
def _activate_form(self, response, form_index=0, form_id=None, inputs={},
cache=True, method=None):
"""
Parameters
----------
method: None or str
Can be used to override the form-specified method
"""
# Extract form from response
root = BeautifulSoup(response.content, 'html5lib')
if form_id is None:
form = root.find_all('form')[form_index]
else:
form = root.find_all('form', id=form_id)[form_index]
# Construct base url
form_action = form.get('action')
if "://" in form_action:
url = form_action
elif form_action.startswith('/'):
url = '/'.join(response.url.split('/', 3)[:3]) + form_action
else:
url = response.url.rsplit('/', 1)[0] + '/' + form_action
# Identify payload format
fmt = None
form_method = form.get('method').lower()
if form_method == 'get':
fmt = 'get' # get(url, params=payload)
elif form_method == 'post':
if 'enctype' in form.attrs:
if form.attrs['enctype'] == 'multipart/form-data':
fmt = 'multipart/form-data' # post(url, files=payload)
elif form.attrs['enctype'] == 'application/x-www-form-urlencoded':
fmt = 'application/x-www-form-urlencoded' # post(url, data=payload)
else:
raise Exception("enctype={0} is not supported!".format(form.attrs['enctype']))
else:
fmt = 'application/x-www-form-urlencoded' # post(url, data=payload)
# Extract payload from form
payload = []
for form_elem in form.find_all(['input', 'select', 'textarea']):
value = None
is_file = False
tag_name = form_elem.name
key = form_elem.get('name')
if tag_name == 'input':
is_file = (form_elem.get('type') == 'file')
value = form_elem.get('value')
if form_elem.get('type') in ['checkbox', 'radio']:
if form_elem.has_attr('checked'):
if not value:
value = 'on'
else:
value = None
elif tag_name == 'select':
if form_elem.get('multiple') is not None:
value = []
if form_elem.select('option[value]'):
for option in form_elem.select('option[value]'):
if option.get('selected') is not None:
value.append(option.get('value'))
else:
for option in form_elem.select('option'):
if option.get('selected') is not None:
# bs4 NavigableString types have bad,
# undesirable properties that result
# in recursion errors when caching
value.append(str(option.string))
else:
if form_elem.select('option[value]'):
for option in form_elem.select('option[value]'):
if option.get('selected') is not None:
value = option.get('value')
# select the first option field if none is selected
if value is None:
value = form_elem.select(
'option[value]')[0].get('value')
else:
# survey form just uses text, not value
for option in form_elem.select('option'):
if option.get('selected') is not None:
value = str(option.string)
# select the first option field if none is selected
if value is None:
value = str(form_elem.select('option')[0].string)
if key in inputs:
if isinstance(inputs[key], list):
# list input is accepted (for array uploads)
value = inputs[key]
else:
value = str(inputs[key])
if (key is not None): # and (value is not None):
if fmt == 'multipart/form-data':
if is_file:
payload.append(
(key, ('', '', 'application/octet-stream')))
else:
if type(value) is list:
for v in value:
entry = (key, ('', v))
# Prevent redundant key, value pairs
# (can happen if the form repeats them)
if entry not in payload:
payload.append(entry)
elif value is None:
entry = (key, ('', ''))
if entry not in payload:
payload.append(entry)
else:
entry = (key, ('', value))
if entry not in payload:
payload.append(entry)
else:
if type(value) is list:
for v in value:
entry = (key, v)
if entry not in payload:
payload.append(entry)
else:
entry = (key, value)
if entry not in payload:
payload.append(entry)
# for future debugging
self._payload = payload
log.debug("Form: payload={0}".format(payload))
if method is not None:
fmt = method
log.debug("Method/format = {0}".format(fmt))
# Send payload
if fmt == 'get':
response = self._request("GET", url, params=payload, cache=cache)
elif fmt == 'multipart/form-data':
response = self._request("POST", url, files=payload, cache=cache)
elif fmt == 'application/x-www-form-urlencoded':
response = self._request("POST", url, data=payload, cache=cache)
return response
def _login(self, username=None, store_password=False,
reenter_password=False):
"""
Login to the ESO User Portal.
Parameters
----------
username : str, optional
Username to the ESO Public Portal. If not given, it should be
specified in the config file.
store_password : bool, optional
Stores the password securely in your keyring. Default is False.
reenter_password : bool, optional
Asks for the password even if it is already stored in the
keyring. This is the way to overwrite an already stored passwork
on the keyring. Default is False.
"""
if username is None:
if self.USERNAME != "":
username = self.USERNAME
elif self.username is not None:
username = self.username
else:
raise LoginError("If you do not pass a username to login(), "
"you should configure a default one!")
else:
# store username as we may need it to re-authenticate
self.username = username
# Get password from keyring or prompt
password, password_from_keyring = self._get_password(
"astroquery:www.eso.org", username, reenter=reenter_password)
# Authenticate
log.info("Authenticating {0} on www.eso.org...".format(username))
# Do not cache pieces of the login process
login_response = self._request("GET", "https://www.eso.org/sso/login",
cache=False)
root = BeautifulSoup(login_response.content, 'html5lib')
login_input = root.find(name='input', attrs={'name': 'execution'})
if login_input is None:
raise ValueError("ESO login page did not have the correct attributes.")
execution = login_input.get('value')
login_result_response = self._request("POST", "https://www.eso.org/sso/login",
data={'username': username,
'password': password,
'execution': execution,
'_eventId': 'submit',
'geolocation': '',
})
login_result_response.raise_for_status()
root = BeautifulSoup(login_result_response.content, 'html5lib')
authenticated = root.find('h4').text == 'Login successful'
if authenticated:
log.info("Authentication successful!")
else:
log.exception("Authentication failed!")
# When authenticated, save password in keyring if needed
if authenticated and password_from_keyring is None and store_password:
keyring.set_password("astroquery:www.eso.org", username, password)
return authenticated
[docs] def list_instruments(self, cache=True):
""" List all the available instrument-specific queries offered by the ESO archive.
Returns
-------
instrument_list : list of strings
cache : bool
Cache the response for faster subsequent retrieval
"""
if self._instrument_list is None:
url = "http://archive.eso.org/cms/eso-data/instrument-specific-query-forms.html"
instrument_list_response = self._request("GET", url, cache=cache)
root = BeautifulSoup(instrument_list_response.content, 'html5lib')
self._instrument_list = []
for element in root.select("div[id=col3] a[href]"):
href = element.attrs["href"]
if u"http://archive.eso.org/wdb/wdb/eso" in href:
instrument = href.split("/")[-2]
if instrument not in self._instrument_list:
self._instrument_list.append(instrument)
return self._instrument_list
[docs] def list_surveys(self, cache=True):
""" List all the available surveys (phase 3) in the ESO archive.
Returns
-------
survey_list : list of strings
cache : bool
Cache the response for faster subsequent retrieval
"""
if self._survey_list is None:
survey_list_response = self._request(
"GET", "http://archive.eso.org/wdb/wdb/adp/phase3_main/form",
cache=cache)
root = BeautifulSoup(survey_list_response.content, 'html5lib')
self._survey_list = []
collections_table = root.find('table', id='collections_table')
other_collections = root.find('select', id='collection_name_option')
# it is possible to have empty collections or other collections...
collection_elts = (collections_table.findAll('input', type='checkbox')
if collections_table is not None
else [])
other_elts = (other_collections.findAll('option')
if other_collections is not None
else [])
for element in (collection_elts + other_elts):
if 'value' in element.attrs:
survey = element.attrs['value']
if survey and survey not in self._survey_list and 'Any' not in survey:
self._survey_list.append(survey)
return self._survey_list
[docs] def query_surveys(self, surveys='', cache=True,
help=False, open_form=False, **kwargs):
"""
Query survey Phase 3 data contained in the ESO archive.
Parameters
----------
survey : string or list
Name of the survey(s) to query. Should beone or more of the
names returned by `~astroquery.eso.EsoClass.list_surveys`. If
specified as a string, should be a comma-separated list of
survey names.
cache : bool
Cache the response for faster subsequent retrieval
Returns
-------
table : `~astropy.table.Table` or `None`
A table representing the data available in the archive for the
specified survey, matching the constraints specified in ``kwargs``.
The number of rows returned is capped by the ROW_LIMIT
configuration item. `None` is returned when the query has no
results.
"""
url = "http://archive.eso.org/wdb/wdb/adp/phase3_main/form"
if open_form:
webbrowser.open(url)
elif help:
self._print_surveys_help(url, cache=cache)
else:
survey_form = self._request("GET", url, cache=cache)
query_dict = kwargs
query_dict["wdbo"] = "csv/download"
if isinstance(surveys, str):
surveys = surveys.split(",")
query_dict['collection_name'] = surveys
if self.ROW_LIMIT >= 0:
query_dict["max_rows_returned"] = int(self.ROW_LIMIT)
else:
query_dict["max_rows_returned"] = 10000
survey_response = self._activate_form(survey_form, form_index=0,
form_id='queryform',
inputs=query_dict, cache=cache)
content = survey_response.content
# First line is always garbage
content = content.split(b'\n', 1)[1]
log.debug("Response content:\n{0}".format(content))
if _check_response(content):
table = Table.read(BytesIO(content), format="ascii.csv",
comment="^#")
return table
else:
warnings.warn("Query returned no results", NoResultsWarning)
[docs] def query_main(self, column_filters={}, columns=[],
open_form=False, help=False, cache=True, **kwargs):
"""
Query raw data contained in the ESO archive.
Parameters
----------
column_filters : dict
Constraints applied to the query.
columns : list of strings
Columns returned by the query.
open_form : bool
If `True`, opens in your default browser the query form
for the requested instrument.
help : bool
If `True`, prints all the parameters accepted in
``column_filters`` and ``columns`` for the requested
``instrument``.
cache : bool
Cache the response for faster subsequent retrieval.
Returns
-------
table : `~astropy.table.Table`
A table representing the data available in the archive for the
specified instrument, matching the constraints specified in
``kwargs``. The number of rows returned is capped by the
ROW_LIMIT configuration item.
"""
url = self.QUERY_INSTRUMENT_URL+"/eso_archive_main/form"
return self._query(url, column_filters=column_filters, columns=columns,
open_form=open_form, help=help, cache=cache, **kwargs)
[docs] def query_instrument(self, instrument, column_filters={}, columns=[],
open_form=False, help=False, cache=True, **kwargs):
"""
Query instrument-specific raw data contained in the ESO archive.
Parameters
----------
instrument : string
Name of the instrument to query, one of the names returned by
`~astroquery.eso.EsoClass.list_instruments`.
column_filters : dict
Constraints applied to the query.
columns : list of strings
Columns returned by the query.
open_form : bool
If `True`, opens in your default browser the query form
for the requested instrument.
help : bool
If `True`, prints all the parameters accepted in
``column_filters`` and ``columns`` for the requested
``instrument``.
cache : bool
Cache the response for faster subsequent retrieval.
Returns
-------
table : `~astropy.table.Table`
A table representing the data available in the archive for the
specified instrument, matching the constraints specified in
``kwargs``. The number of rows returned is capped by the
ROW_LIMIT configuration item.
"""
url = self.QUERY_INSTRUMENT_URL+'/{0}/form'.format(instrument.lower())
return self._query(url, column_filters=column_filters, columns=columns,
open_form=open_form, help=help, cache=cache, **kwargs)
def _query(self, url, column_filters={}, columns=[],
open_form=False, help=False, cache=True, **kwargs):
table = None
if open_form:
webbrowser.open(url)
elif help:
self._print_query_help(url)
else:
instrument_form = self._request("GET", url, cache=cache)
query_dict = {}
query_dict.update(column_filters)
# TODO: replace this with individually parsed kwargs
query_dict.update(kwargs)
query_dict["wdbo"] = "csv/download"
# Default to returning the DP.ID since it is needed for header
# acquisition
query_dict['tab_dp_id'] = kwargs.pop('tab_dp_id', 'on')
for k in columns:
query_dict["tab_" + k] = True
if self.ROW_LIMIT >= 0:
query_dict["max_rows_returned"] = int(self.ROW_LIMIT)
else:
query_dict["max_rows_returned"] = 10000
# used to be form 0, but now there's a new 'logout' form at the top
# (form_index = -1 and 0 both work now that form_id is included)
instrument_response = self._activate_form(instrument_form,
form_index=-1,
form_id='queryform',
inputs=query_dict,
cache=cache)
content = instrument_response.content
# First line is always garbage
content = content.split(b'\n', 1)[1]
log.debug("Response content:\n{0}".format(content))
if _check_response(content):
table = Table.read(BytesIO(content), format="ascii.csv",
comment='^#')
return table
else:
warnings.warn("Query returned no results", NoResultsWarning)
def _check_existing_files(self, datasets, continuation=False,
destination=None):
"""Detect already downloaded datasets."""
datasets_to_download = []
files = []
for dataset in datasets:
ext = os.path.splitext(dataset)[1].lower()
if ext in ('.fits', '.tar'):
local_filename = dataset
elif ext == '.fz':
local_filename = dataset[:-3]
elif ext == '.z':
local_filename = dataset[:-2]
else:
local_filename = dataset + ".fits"
if destination is not None:
local_filename = os.path.join(destination,
local_filename)
elif self.cache_location is not None:
local_filename = os.path.join(self.cache_location,
local_filename)
if os.path.exists(local_filename):
log.info("Found {0}.fits...".format(dataset))
if continuation:
datasets_to_download.append(dataset)
else:
files.append(local_filename)
elif os.path.exists(local_filename + ".Z"):
log.info("Found {0}.fits.Z...".format(dataset))
if continuation:
datasets_to_download.append(dataset)
else:
files.append(local_filename + ".Z")
elif os.path.exists(local_filename + ".fz"): # RICE-compressed
log.info("Found {0}.fits.fz...".format(dataset))
if continuation:
datasets_to_download.append(dataset)
else:
files.append(local_filename + ".fz")
else:
datasets_to_download.append(dataset)
return datasets_to_download, files
def _download_file(self, url, local_filepath, **kwargs):
""" Wraps QueryWithLogin._download_file to detect if the
authentication expired.
"""
trials = 1
while trials <= 2:
resp = super(EsoClass, self)._download_file(url, local_filepath,
**kwargs)
# trying to detect the failing authentication:
# - content type should not be html
if (resp.headers['Content-Type'] == 'text/html;charset=UTF-8' and
resp.url.startswith('https://www.eso.org/sso/login')):
if trials == 1:
log.warning("Session expired, trying to re-authenticate")
self.login()
trials += 1
else:
raise LoginError("Could not authenticate")
else:
break
return resp
[docs] def retrieve_data(self, datasets, continuation=False, destination=None,
with_calib='none', request_all_objects=False,
unzip=True, request_id=None):
"""
Retrieve a list of datasets form the ESO archive.
Parameters
----------
datasets : list of strings or string
List of datasets strings to retrieve from the archive.
destination: string
Directory where the files are copied.
Files already found in the destination directory are skipped,
unless continuation=True.
Default to astropy cache.
continuation : bool
Force the retrieval of data that are present in the destination
directory.
with_calib : string
Retrieve associated calibration files: 'none' (default), 'raw' for
raw calibrations, or 'processed' for processed calibrations.
request_all_objects : bool
When retrieving associated calibrations (``with_calib != 'none'``),
this allows to request all the objects included the already
downloaded ones, to be sure to retrieve all calibration files.
This is useful when the download was interrupted. `False` by
default.
unzip : bool
Unzip compressed files from the archive after download. `True` by
default.
request_id : str, int
Retrieve from an existing request number rather than sending a new
query, with the identifier from the URL in the email sent from
the archive from the earlier request as in:
https://dataportal.eso.org/rh/requests/[USERNAME]/[request_id]
Returns
-------
files : list of strings or string
List of files that have been locally downloaded from the archive.
Examples
--------
>>> dptbl = Eso.query_instrument('apex', pi_coi='ginsburg')
>>> dpids = [row['DP.ID'] for row in dptbl if 'Map' in row['Object']]
>>> files = Eso.retrieve_data(dpids)
"""
calib_options = {'none': '', 'raw': 'CalSelectorRaw2Raw',
'processed': 'CalSelectorRaw2Master'}
if with_calib not in calib_options:
raise ValueError("invalid value for 'with_calib', "
"it must be 'none', 'raw' or 'processed'")
if isinstance(datasets, str):
return_list = False
datasets = [datasets]
else:
return_list = True
if not isinstance(datasets, (list, tuple, np.ndarray)):
raise TypeError("Datasets must be given as a list of strings.")
# First: Detect datasets already downloaded
if with_calib != 'none' and request_all_objects:
datasets_to_download, files = list(datasets), []
else:
log.info("Detecting already downloaded datasets...")
datasets_to_download, files = self._check_existing_files(
datasets, continuation=continuation, destination=destination)
# Second: Check that the datasets to download are in the archive
if request_id is None:
log.info("Checking availability of datasets to download...")
valid_datasets = [self.verify_data_exists(ds)
for ds in datasets_to_download]
else:
# Assume all valid if a request_id was provided
valid_datasets = [(ds, True) for ds in datasets_to_download]
if not all(valid_datasets):
invalid_datasets = [ds for ds, v in zip(datasets_to_download,
valid_datasets) if not v]
raise ValueError("The following data sets were not found on the "
"ESO servers: {0}".format(invalid_datasets))
# Third: Download the other datasets
log.info("Downloading datasets...")
if datasets_to_download:
if not self.authenticated():
self.login()
url = "http://archive.eso.org/cms/eso-data/eso-data-direct-retrieval.html"
with suspend_cache(self): # Never cache staging operations
if request_id is None:
log.info("Contacting retrieval server...")
retrieve_data_form = self._request("GET", url,
cache=False)
retrieve_data_form.raise_for_status()
log.info("Staging request...")
inputs = {"list_of_datasets": "\n".join(datasets_to_download)}
data_confirmation_form = self._activate_form(
retrieve_data_form, form_index=-1, inputs=inputs,
cache=False)
data_confirmation_form.raise_for_status()
root = BeautifulSoup(data_confirmation_form.content,
'html5lib')
login_button = root.select('input[value=LOGIN]')
if login_button:
raise LoginError("Not logged in. "
"You must be logged in to download data.")
inputs = {}
if with_calib != 'none':
inputs['requestCommand'] = calib_options[with_calib]
# TODO: There may be another screen for Not Authorized;
# that should be included too
# form name is "retrieve"; no id
data_download_form = self._activate_form(
data_confirmation_form, form_index=-1, inputs=inputs,
cache=False)
else:
# Build URL by hand
request_url = 'https://dataportal.eso.org/rh/requests/'
request_url += f'{self.USERNAME}/{request_id}'
data_download_form = self._request("GET", request_url,
cache=False)
_content = data_download_form.content.decode('utf-8')
if ('Request Handler - Error' in _content):
# Likely a problem with the request_url
msg = (f"The form at {request_url} returned an error."
" See your recent requests at "
"https://dataportal.eso.org/rh/requests/"
f"{self.USERNAME}/recentRequests")
raise RemoteServiceError(msg)
log.info("Staging form is at {0}"
.format(data_download_form.url))
root = BeautifulSoup(data_download_form.content, 'html5lib')
state = root.select('span[id=requestState]')[0].text
t0 = time.time()
while state not in ('COMPLETE', 'ERROR'):
time.sleep(2.0)
data_download_form = self._request("GET",
data_download_form.url,
cache=False)
root = BeautifulSoup(data_download_form.content,
'html5lib')
state = root.select('span[id=requestState]')[0].text
print("{0:20.0f}s elapsed"
.format(time.time() - t0), end='\r')
sys.stdout.flush()
if state == 'ERROR':
raise RemoteServiceError("There was a remote service "
"error; perhaps the requested "
"file could not be found?")
if with_calib != 'none':
# when requested files with calibrations, some javascript is
# used to display the files, which prevent retrieving the files
# directly. So instead we retrieve the download script provided
# in the web page, and use it to extract the list of files.
# The benefit of this is also that in the download script the
# list of files is de-duplicated, whereas on the web page the
# calibration files would be duplicated for each exposure.
link = root.select('a[href$="/script"]')[0]
if 'downloadRequest' not in link.text:
# Make sure that we found the correct link
raise RemoteServiceError(
"A link was found in the download file for the "
"calibrations that is not a downloadRequest link "
"and therefore appears invalid.")
href = link.attrs['href']
script = self._request("GET", href, cache=False)
fileLinks = re.findall(
r'"(https://dataportal.eso.org/dataPortal/api/requests/.*)"',
script.text)
# urls with api/ require using Basic Authentication, though
# it's easier for us to reuse the existing requests session (to
# avoid asking agin for a username/password if it is not
# stored). So we remove api/ from the urls:
fileLinks = [
f.replace('https://dataportal.eso.org/dataPortal/api/requests',
'https://dataportal.eso.org/dataPortal/requests')
for f in fileLinks]
log.info("Detecting already downloaded datasets, "
"including calibrations...")
fileIds = [f.rsplit('/', maxsplit=1)[1] for f in fileLinks]
filteredIds, files = self._check_existing_files(
fileIds, continuation=continuation,
destination=destination)
fileLinks = [f for f, fileId in zip(fileLinks, fileIds)
if fileId in filteredIds]
else:
fileIds = root.select('input[name=fileId]')
fileLinks = ["http://dataportal.eso.org/dataPortal" +
fileId.attrs['value'].split()[1]
for fileId in fileIds]
nfiles = len(fileLinks)
log.info("Downloading {} files...".format(nfiles))
log.debug("Files:\n{}".format('\n'.join(fileLinks)))
for i, fileLink in enumerate(fileLinks, 1):
fileId = fileLink.rsplit('/', maxsplit=1)[1]
if request_id is not None:
# Since we fetched the script directly without sending
# a new request, check here that the file in the list
# is among those requested in the input list
if fileId.split('.fits')[0] not in datasets_to_download:
continue
log.info("Downloading file {}/{}: {}..."
.format(i, nfiles, fileId))
filename = self._request("GET", fileLink, save=True,
continuation=True)
if filename.endswith(('.gz', '.7z', '.bz2', '.xz', '.Z')) and unzip:
log.info("Unzipping file {0}...".format(fileId))
filename = system_tools.gunzip(filename)
if destination is not None:
log.info("Copying file {0} to {1}...".format(fileId, destination))
destfile = os.path.join(destination, os.path.basename(filename))
shutil.move(filename, destfile)
files.append(destfile)
else:
files.append(filename)
# Empty the redirect cache of this request session
# Only available and needed for requests versions < 2.17
try:
self._session.redirect_cache.clear()
except AttributeError:
pass
log.info("Done!")
if (not return_list) and (len(files) == 1):
files = files[0]
return files
[docs] def verify_data_exists(self, dataset):
"""
Given a data set name, return 'True' if ESO has the file and 'False'
otherwise
"""
url = 'http://archive.eso.org/wdb/wdb/eso/eso_archive_main/query'
payload = {'dp_id': dataset,
'ascii_out_mode': 'true',
}
# Never cache this as it is verifying the existence of remote content
response = self._request("POST", url, params=payload, cache=False)
content = response.text
return 'No data returned' not in content
[docs] def query_apex_quicklooks(self, project_id=None, help=False,
open_form=False, cache=True, **kwargs):
"""
APEX data are distributed with quicklook products identified with a
different name than other ESO products. This query tool searches by
project ID or any other supported keywords.
Examples
--------
>>> tbl = Eso.query_apex_quicklooks('093.C-0144')
>>> files = Eso.retrieve_data(tbl['Product ID'])
"""
apex_query_url = 'http://archive.eso.org/wdb/wdb/eso/apex_product/form'
table = None
if open_form:
webbrowser.open(apex_query_url)
elif help:
return self._print_instrument_help(apex_query_url, 'apex')
else:
payload = {'wdbo': 'csv/download'}
if project_id is not None:
payload['prog_id'] = project_id
payload.update(kwargs)
apex_form = self._request("GET", apex_query_url, cache=cache)
apex_response = self._activate_form(
apex_form, form_id='queryform', form_index=0, inputs=payload,
cache=cache, method='application/x-www-form-urlencoded')
content = apex_response.content
if _check_response(content):
# First line is always garbage
content = content.split(b'\n', 1)[1]
try:
table = Table.read(BytesIO(content), format="ascii.csv",
guess=False, # header_start=1,
comment="#", encoding='utf-8')
except ValueError as ex:
if 'the encoding parameter is not supported on Python 2' in str(ex):
# astropy python2 does not accept the encoding parameter
table = Table.read(BytesIO(content), format="ascii.csv",
guess=False,
comment="#")
else:
raise ex
else:
raise RemoteServiceError("Query returned no results")
return table
def _print_query_help(self, url, cache=True):
"""
Download a form and print it in a quasi-human-readable way
"""
log.info("List of accepted column_filters parameters.")
log.info("The presence of a column in the result table can be "
"controlled if prefixed with a [ ] checkbox.")
log.info("The default columns in the result table are shown as "
"already ticked: [x].")
result_string = []
resp = self._request("GET", url, cache=cache)
doc = BeautifulSoup(resp.content, 'html5lib')
form = doc.select("html body form pre")[0]
# Unwrap all paragraphs
paragraph = form.find('p')
while paragraph:
paragraph.unwrap()
paragraph = form.find('p')
# For all sections
for section in form.select("table"):
section_title = "".join(section.stripped_strings)
section_title = "\n".join(["", section_title,
"-" * len(section_title)])
result_string.append(section_title)
checkbox_name = ""
checkbox_value = ""
for tag in section.next_siblings:
if tag.name == u"table":
break
elif tag.name == u"input":
if tag.get(u'type') == u"checkbox":
checkbox_name = tag['name']
checkbox_value = u"[x]" if ('checked' in tag.attrs) else u"[ ]"
name = ""
value = ""
else:
name = tag['name']
value = ""
elif tag.name == u"select":
options = []
for option in tag.select("option"):
options += ["{0} ({1})"
.format(option['value'],
"".join(option.stripped_strings))]
name = tag[u"name"]
value = ", ".join(options)
else:
name = ""
value = ""
if u"tab_" + name == checkbox_name:
checkbox = checkbox_value
else:
checkbox = " "
if name != u"":
result_string.append("{0} {1}: {2}"
.format(checkbox, name, value))
print("\n".join(result_string))
return result_string
def _print_surveys_help(self, url, cache=True):
"""
Download a form and print it in a quasi-human-readable way
"""
log.info("List of the parameters accepted by the "
"surveys query.")
log.info("The presence of a column in the result table can be "
"controlled if prefixed with a [ ] checkbox.")
log.info("The default columns in the result table are shown as "
"already ticked: [x].")
result_string = []
resp = self._request("GET", url, cache=cache)
doc = BeautifulSoup(resp.content, 'html5lib')
form = doc.select("html body form")[0]
# hovertext from different labels are used to give more info on forms
helptext_dict = {abbr['title'].split(":")[0].strip():
":".join(abbr['title'].split(":")[1:])
for abbr in form.findAll('abbr')
if 'title' in abbr.attrs and ":" in abbr['title']}
for fieldset in form.select('fieldset'):
legend = fieldset.select('legend')
if len(legend) > 1:
raise ValueError("Form parsing error: too many legends.")
elif len(legend) == 0:
continue
section_title = "\n\n"+"".join(legend[0].stripped_strings)+"\n"
result_string.append(section_title)
for section in fieldset.select('table'):
checkbox_name = ""
checkbox_value = ""
for tag in section.next_elements:
if tag.name == u"table":
break
elif tag.name == u"input":
if tag.get(u'type') == u"checkbox":
checkbox_name = tag['name']
checkbox_value = (u"[x]"
if ('checked' in tag.attrs)
else u"[ ]")
name = ""
value = ""
else:
name = tag['name']
value = ""
elif tag.name == u"select":
options = []
for option in tag.select("option"):
options += ["{0} ({1})"
.format(option['value']
if 'value' in option
else "",
"".join(option.stripped_strings))]
name = tag[u"name"]
value = ", ".join(options)
else:
name = ""
value = ""
if u"tab_" + name == checkbox_name:
checkbox = checkbox_value
else:
checkbox = " "
if name != u"":
result_string.append("{0} {1}: {2}"
.format(checkbox, name, value))
if name.strip() in helptext_dict:
result_string.append(helptext_dict[name.strip()])
print("\n".join(result_string))
return result_string
Eso = EsoClass()