# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Simbad query class for accessing the Simbad Service
"""
import copy
import re
import requests
import json
import os
from collections import namedtuple
from io import BytesIO
import warnings
import astropy.units as u
from astropy.utils.data import get_pkg_data_filename
import astropy.coordinates as coord
from astropy.table import Table
import astropy.io.votable as votable
from astroquery.query import BaseQuery
from astroquery.utils import commons, async_to_sync
from astroquery.exceptions import TableParseError, LargeQueryWarning
from . import conf
__all__ = ['Simbad', 'SimbadClass', 'SimbadBaseQuery']
def validate_epoch(value):
pattern = re.compile(r'^[JB]\d+[.]?\d+$', re.IGNORECASE)
if pattern.match(value) is None:
raise ValueError("Epoch must be specified as [J|B]<epoch>.\n"
"Example: epoch='J2000'")
return value
def validate_equinox(value):
try:
return float(value)
except (ValueError, TypeError):
raise ValueError("Equinox must be a number")
def validate_epoch_decorator(func):
"""
A method decorator that checks if the epoch value entered by the user
is acceptable.
"""
def wrapper(*args, **kwargs):
if kwargs.get('epoch'):
value = kwargs['epoch']
validate_epoch(value)
return func(*args, **kwargs)
return wrapper
def validate_equinox_decorator(func):
"""
A method decorator that checks if the equinox value entered by the user
is acceptable.
"""
def wrapper(*args, **kwargs):
if kwargs.get('equinox'):
value = kwargs['equinox']
validate_equinox(value)
return func(*args, **kwargs)
return wrapper
def strip_field(field, keep_filters=False):
"""Helper tool: remove parameters from VOTABLE fields
However, this should only be applied to a subset of VOTABLE fields:
* ra
* dec
* otype
* id
* coo
* bibcodelist
*if* keep_filters is specified
"""
if '(' in field:
root = field[:field.find('(')]
if (root in ('ra', 'dec', 'otype', 'id', 'coo', 'bibcodelist') or
not keep_filters):
return root
# the overall else (default option)
return field
error_regex = re.compile(r'(?ms)\[(?P<line>\d+)\]\s?(?P<msg>.+?)(\[|\Z)')
SimbadError = namedtuple('SimbadError', ('line', 'msg'))
VersionInfo = namedtuple('VersionInfo', ('major', 'minor', 'micro', 'patch'))
class SimbadResult:
__sections = ('script', 'console', 'error', 'data')
def __init__(self, txt, verbose=False):
self.__txt = txt
self.__stringio = None
self.__indexes = {}
self.verbose = verbose
self.exectime = None
self.sim_version = None
self.__split_sections()
self.__parse_console_section()
self.__warn()
def __split_sections(self):
for section in self.__sections:
match = re.search(r'(?ims)^::%s:+?\r?$(?P<content>.*?)(^::|\Z)' %
section, self.__txt)
if match:
self.__indexes[section] = (match.start('content'),
match.end('content'))
def __parse_console_section(self):
if self.console is None:
return
match = re.search(r'(?ims)total execution time: ([.\d]+?)\s*?secs',
self.console)
if match:
self.exectime = float(match.group(1))
match = re.search(r'(?ms)SIMBAD(\d) rel (\d)[.](\d+)([^\d^\s])?',
self.console)
if match:
self.sim_version = VersionInfo(*match.groups(None))
def __warn(self):
for error in self.errors:
warnings.warn("Warning: The script line number %i raised "
"an error (recorded in the `errors` attribute "
"of the result table): %s" %
(error.line, error.msg))
def __get_section(self, section_name):
if section_name in self.__indexes:
return self.__txt[self.__indexes[section_name][0]:
self.__indexes[section_name][1]].strip()
@property
def script(self):
return self.__get_section('script')
@property
def console(self):
return self.__get_section('console')
@property
def error_raw(self):
return self.__get_section('error')
@property
def data(self):
return self.__get_section('data')
@property
def errors(self):
result = []
if self.error_raw is None:
return result
for err in error_regex.finditer(self.error_raw):
result.append(SimbadError(int(err.group('line')),
err.group('msg').replace('\n', ' ')))
return result
@property
def nb_errors(self):
if self.error_raw is None:
return 0
return len(self.errors)
class SimbadVOTableResult(SimbadResult):
"""VOTable-type Simbad result"""
def __init__(self, txt, verbose=False, pedantic=False):
self.__pedantic = pedantic
self.__table = None
if not verbose:
commons.suppress_vo_warnings()
super().__init__(txt, verbose=verbose)
@property
def table(self):
if self.__table is None:
self.bytes = BytesIO(self.data.encode('utf8'))
tbl = votable.parse_single_table(self.bytes, verify='warn')
self.__table = tbl.to_table()
self.__table.convert_bytestring_to_unicode()
return self.__table
bibcode_regex = re.compile(r'query\s+bibcode\s+(wildcard)?\s+([\w]*)')
class SimbadBibcodeResult(SimbadResult):
"""Bibliography-type Simbad result"""
@property
def table(self):
bibcode_match = bibcode_regex.search(self.script)
splitter = bibcode_match.group(2)
ref_list = [splitter + ref for ref in self.data.split(splitter)][1:]
max_len = max([len(r) for r in ref_list])
table = Table(names=['References'], dtype=['U%i' % max_len])
for ref in ref_list:
table.add_row([ref])
return table
class SimbadObjectIDsResult(SimbadResult):
"""Object identifier list Simbad result"""
@property
def table(self):
max_len = max([len(i) for i in self.data.splitlines()])
table = Table(names=['ID'], dtype=['S%i' % max_len])
for id in self.data.splitlines():
table.add_row([id.strip()])
return table
[docs]class SimbadBaseQuery(BaseQuery):
"""
SimbadBaseQuery overloads the base query because we know that SIMBAD will
sometimes blacklist users for exceeding rate limits. This warning results
in a "connection refused" error (error 61) instead of a more typical "error
8" that you would get from not having an internet connection at all.
"""
def _request(self, *args, **kwargs):
try:
response = super()._request(*args, **kwargs)
except requests.exceptions.ConnectionError as ex:
if 'Errno 61' in str(ex):
extratext = ("\n\n"
"************************* \n"
"ASTROQUERY ADDED WARNING: \n"
"************************* \n"
"Error 61 received from SIMBAD server. "
"This may indicate that you have been "
"blacklisted for exceeding the query rate limit."
" See the astroquery SIMBAD documentation. "
"Blacklists are generally cleared after ~1 hour. "
"Please reconsider your approach, you may want "
"to use vectorized queries."
)
ex.args[0].args = (ex.args[0].args[0] + extratext,)
raise ex
if response.status_code == 403:
errmsg = ("Error 403: Forbidden. You may get this error if you "
"exceed the SIMBAD server's rate limits. Try again in "
"a few seconds or minutes.")
raise requests.exceptions.HTTPError(errmsg)
else:
response.raise_for_status()
return response
[docs]@async_to_sync
class SimbadClass(SimbadBaseQuery):
"""
The class for querying the Simbad web service.
Note that SIMBAD suggests submitting no more than 6 queries per second; if
you submit more than that, your IP may be temporarily blacklisted
(http://simbad.u-strasbg.fr/simbad/sim-help?Page=sim-url)
"""
SIMBAD_URL = 'http://' + conf.server + '/simbad/sim-script'
TIMEOUT = conf.timeout
WILDCARDS = {
'*': 'Any string of characters (including an empty one)',
'?': 'Any character (exactly one character)',
'[abc]': ('Exactly one character taken in the list. '
'Can also be defined by a range of characters: [A-Z]'
),
'[^0-9]': 'Any (one) character not in the list.'}
_ORDERED_WILDCARDS = ['*', '?', '[abc]', '[^0-9]']
# query around not included since this is a subcase of query_region
_function_to_command = {
'query_object_async': 'query id',
'query_region_async': 'query coo',
'query_catalog_async': 'query cat',
'query_criteria_async': 'query sample',
'query_bibcode_async': 'query bibcode',
'query_bibobj_async': 'query bibobj'
}
ROW_LIMIT = conf.row_limit
# also find a way to fetch the votable fields table from
# <http://simbad.u-strasbg.fr/simbad/sim-help?Page=sim-fscript#VotableFields>
# tried something for this in this ipython nb
# <http://nbviewer.ipython.org/5851110>
_VOTABLE_FIELDS = ['main_id', 'coordinates']
def __init__(self):
super().__init__()
self._VOTABLE_FIELDS = copy.copy(self._VOTABLE_FIELDS)
[docs] def list_wildcards(self):
"""
Displays the available wildcards that may be used in Simbad queries and
their usage.
Examples
--------
>>> from astroquery.simbad import Simbad
>>> Simbad.list_wildcards()
* : Any string of characters (including an empty one)...
[^0-9] : Any (one) character not in the list.
? : Any character (exactly one character)
[abc] : Exactly one character taken in the list.
Can also be defined by a range of characters: [A-Z]
"""
for key in self._ORDERED_WILDCARDS:
print("{key} : {value}\n".format(key=key,
value=self.WILDCARDS[key]))
return
[docs] def list_votable_fields(self):
"""
Lists all the fields that can be fetched for a VOTable.
Examples
--------
>>> from astroquery.simbad import Simbad
>>> Simbad.list_votable_fields()
--NOTES--...
"""
# display additional notes:
notes_file = get_pkg_data_filename(
os.path.join('data', 'votable_fields_notes.json'))
with open(notes_file, "r") as f:
notes = json.load(f)
print("--NOTES--\n")
for i, line in list(enumerate(notes)):
print("{lineno}. {msg}\n".format(lineno=i + 1, msg=line))
dict_file = get_pkg_data_filename(
os.path.join('data', 'votable_fields_dict.json'))
with open(dict_file, "r") as f:
fields_dict = json.load(f)
print("Available VOTABLE fields:\n")
for field in list(sorted(fields_dict.keys())):
print("{}".format(field))
print("For more information on a field:\n"
"Simbad.get_field_description ('field_name') \n"
"Currently active VOTABLE fields:\n {0}"
.format(self._VOTABLE_FIELDS))
[docs] def get_field_description(self, field_name):
"""
Displays a description of the VOTable field.
Parameters
----------
field_name : str
the name of the field to describe. Must be one of those listed
by `list_votable_fields`.
Examples
--------
>>> from astroquery.simbad import Simbad
>>> Simbad.get_field_description('main_id')
main identifier of an astronomical object. It is the same as id(1)
>>> Simbad.get_field_description('bibcodelist(y1-y2)')
number of references. The parameter is optional and limit the count to
the references between the years y1 and y2
"""
# first load the dictionary from json
dict_file = get_pkg_data_filename(
os.path.join('data', 'votable_fields_dict.json'))
with open(dict_file, "r") as f:
fields_dict = json.load(f)
try:
print(fields_dict[field_name])
except KeyError:
raise KeyError("No such field_name")
[docs] def get_votable_fields(self):
"""
Display votable fields
Examples
--------
>>> from astroquery.simbad import Simbad
>>> Simbad.get_votable_fields()
['main_id', 'coordinates']
"""
return self._VOTABLE_FIELDS
[docs] def add_votable_fields(self, *args):
"""
Sets fields to be fetched in the VOTable. Must be one of those listed
by `list_votable_fields`.
Parameters
----------
list of field_names
"""
dict_file = get_pkg_data_filename(
os.path.join('data', 'votable_fields_dict.json'))
with open(dict_file, "r") as f:
fields_dict = json.load(f)
fields_dict = dict(
((strip_field(ff), fields_dict[ff])
for ff in fields_dict))
for field in args:
sf = strip_field(field)
if sf not in fields_dict:
raise KeyError("{field}: no such field".format(field=field))
else:
self._VOTABLE_FIELDS.append(field)
[docs] def remove_votable_fields(self, *args, **kwargs):
"""
Removes the specified field names from ``SimbadClass._VOTABLE_FIELDS``
Parameters
----------
list of field_names to be removed
strip_params: bool
If true, strip the specified keywords before removing them:
e.g., ra(foo) would remove ra(bar) if this is True
"""
strip_params = kwargs.pop('strip_params', False)
if strip_params:
sargs = [strip_field(a) for a in args]
sfields = [strip_field(a) for a in self._VOTABLE_FIELDS]
else:
sargs = args
sfields = self._VOTABLE_FIELDS
absent_fields = set(sargs) - set(sfields)
for b, f in list(zip(sfields, self._VOTABLE_FIELDS)):
if b in sargs:
self._VOTABLE_FIELDS.remove(f)
for field in absent_fields:
warnings.warn("{field}: this field is not set".format(field=field))
# check if all fields are removed
if not self._VOTABLE_FIELDS:
warnings.warn("All fields have been removed. "
"Resetting to defaults.")
self.reset_votable_fields()
[docs] def reset_votable_fields(self):
"""
resets VOTABLE_FIELDS to defaults
"""
self._VOTABLE_FIELDS = ['main_id', 'coordinates']
[docs] def query_criteria(self, *args, **kwargs):
"""
Query SIMBAD based on any criteria.
Parameters
----------
args:
String arguments passed directly to SIMBAD's script
(e.g., 'region(box, GAL, 10.5 -10.5, 0.5d 0.5d)')
kwargs:
Keyword / value pairs passed to SIMBAD's script engine
(e.g., {'otype':'SNR'} will be rendered as otype=SNR)
Returns
-------
table : `~astropy.table.Table`
Query results table
"""
verbose = kwargs.pop('verbose', False)
result = self.query_criteria_async(*args, **kwargs)
return self._parse_result(result, SimbadVOTableResult, verbose=verbose)
[docs] def query_criteria_async(self, *args, **kwargs):
"""
Query SIMBAD based on any criteria.
Parameters
----------
args:
String arguments passed directly to SIMBAD's script
(e.g., 'region(box, GAL, 10.5 -10.5, 0.5d 0.5d)')
kwargs:
Keyword / value pairs passed to SIMBAD's script engine
(e.g., {'otype':'SNR'} will be rendered as otype=SNR)
cache : bool
Cache the query?
Returns
-------
response : `requests.Response`
Response of the query from the server
"""
cache = kwargs.pop('cache', True)
request_payload = self._args_to_payload(caller='query_criteria_async',
*args, **kwargs)
response = self._request("POST", self.SIMBAD_URL, data=request_payload,
timeout=self.TIMEOUT, cache=cache)
return response
[docs] def query_object(self, object_name, wildcard=False, verbose=False,
get_query_payload=False):
"""
Queries Simbad for the given object and returns the result as a
`~astropy.table.Table`. Object names may also be specified with
wildcard. See examples below.
Parameters
----------
object_name : str
name of object to be queried
wildcard : boolean, optional
When it is set to `True` it implies that the object is specified
with wildcards. Defaults to `False`.
get_query_payload : bool, optional
When set to `True` the method returns the HTTP request parameters.
Defaults to `False`.
Returns
-------
table : `~astropy.table.Table`
Query results table
"""
response = self.query_object_async(object_name, wildcard=wildcard,
get_query_payload=get_query_payload)
if get_query_payload:
return response
return self._parse_result(response, SimbadVOTableResult,
verbose=verbose)
[docs] def query_object_async(self, object_name, wildcard=False, cache=True,
get_query_payload=False):
"""
Serves the same function as `query_object`, but
only collects the response from the Simbad server and returns.
Parameters
----------
object_name : str
name of object to be queried
wildcard : boolean, optional
When it is set to `True` it implies that the object is specified
with wildcards. Defaults to `False`.
get_query_payload : bool, optional
When set to `True` the method returns the HTTP request parameters.
Defaults to `False`.
Returns
-------
response : `requests.Response`
Response of the query from the server
"""
request_payload = self._args_to_payload(object_name, wildcard=wildcard,
caller='query_object_async')
if get_query_payload:
return request_payload
response = self._request("POST", self.SIMBAD_URL, data=request_payload,
timeout=self.TIMEOUT, cache=cache)
return response
[docs] def query_objects(self, object_names, wildcard=False, verbose=False,
get_query_payload=False):
"""
Queries Simbad for the specified list of objects and returns the
results as a `~astropy.table.Table`. Object names may be specified
with wildcards if desired.
Parameters
----------
object_names : sequence of strs
names of objects to be queried
wildcard : boolean, optional
When `True`, the names may have wildcards in them. Defaults to
`False`.
get_query_payload : bool, optional
When set to `True` the method returns the HTTP request parameters.
Defaults to `False`.
Returns
-------
table : `~astropy.table.Table`
Query results table
"""
return self.query_object('\n'.join(object_names), wildcard=wildcard,
get_query_payload=get_query_payload)
[docs] def query_objects_async(self, object_names, wildcard=False, cache=True,
get_query_payload=False):
"""
Same as `query_objects`, but only collects the response from the
Simbad server and returns.
Parameters
----------
object_names : sequence of strs
names of objects to be queried
wildcard : boolean, optional
When `True`, the names may have wildcards in them. Defaults to
`False`.
get_query_payload : bool, optional
When set to `True` the method returns the HTTP request parameters.
Defaults to `False`.
Returns
-------
response : `requests.Response`
Response of the query from the server
"""
return self.query_object_async('\n'.join(object_names),
wildcard=wildcard, cache=cache,
get_query_payload=get_query_payload)
[docs] def query_region_async(self, coordinates, radius=2*u.arcmin,
equinox=2000.0, epoch='J2000', cache=True,
get_query_payload=False):
"""
Serves the same function as `query_region`, but
only collects the response from the Simbad server and returns.
Parameters
----------
coordinates : str or `astropy.coordinates` object
the identifier or coordinates around which to query.
radius : str or `~astropy.units.Quantity`, optional
the radius of the region. If missing, set to default
value of 2 arcmin.
equinox : float, optional
the equinox of the coordinates. If missing set to
default 2000.0.
epoch : str, optional
the epoch of the input coordinates. Must be specified as
[J|B] <epoch>. If missing, set to default J2000.
get_query_payload : bool, optional
When set to `True` the method returns the HTTP request parameters.
Defaults to `False`.
Returns
-------
response : `requests.Response`
Response of the query from the server.
"""
equinox = validate_equinox(equinox)
epoch = validate_epoch(epoch)
base_query_str = "query coo {ra} {dec} radius={rad} frame={frame} equi={equinox}"
if radius is None:
radius = 2*u.arcmin
header = self._get_query_header()
footer = self._get_query_footer()
ra, dec, frame = _parse_coordinates(coordinates)
# handle the vector case
if isinstance(ra, list):
vector = True
if len(ra) > 10000:
warnings.warn("For very large queries, you may receive a "
"timeout error. SIMBAD suggests splitting "
"queries with >10000 entries into multiple "
"threads", LargeQueryWarning)
if len(set(frame)) > 1:
raise ValueError("Coordinates have different frames")
else:
frame = set(frame).pop()
if vector and _has_length(radius) and len(radius) == len(ra):
# all good, continue
pass
elif vector and _has_length(radius) and len(radius) != len(ra):
raise ValueError("Mismatch between radii and coordinates")
elif vector and not _has_length(radius):
radius = [_parse_radius(radius)] * len(ra)
if vector:
query_str = "\n".join([base_query_str
.format(ra=ra_, dec=dec_, rad=rad_,
frame=frame, equinox=equinox)
for ra_, dec_, rad_ in zip(ra, dec, radius)])
else:
radius = _parse_radius(radius)
query_str = base_query_str.format(ra=ra, dec=dec, frame=frame,
rad=radius, equinox=equinox)
request_payload = {'script': "\n".join([header, query_str, footer])}
if get_query_payload:
return request_payload
response = self._request("POST", self.SIMBAD_URL, data=request_payload,
timeout=self.TIMEOUT, cache=cache)
return response
[docs] def query_catalog(self, catalog, verbose=False, cache=True,
get_query_payload=False):
"""
Queries a whole catalog.
Results may be very large -number of rows
should be controlled by configuring `SimbadClass.ROW_LIMIT`.
Parameters
----------
catalog : str
the name of the catalog.
get_query_payload : bool, optional
When set to `True` the method returns the HTTP request parameters.
Defaults to `False`.
Returns
-------
table : `~astropy.table.Table`
Query results table
"""
response = self.query_catalog_async(catalog, cache=cache,
get_query_payload=get_query_payload)
if get_query_payload:
return response
return self._parse_result(response, SimbadVOTableResult,
verbose=verbose)
[docs] def query_catalog_async(self, catalog, cache=True, get_query_payload=False):
"""
Serves the same function as `query_catalog`, but
only collects the response from the Simbad server and returns.
Parameters
----------
catalog : str
the name of the catalog.
get_query_payload : bool, optional
When set to `True` the method returns the HTTP request parameters.
Defaults to `False`.
Returns
-------
response : `requests.Response`
Response of the query from the server.
"""
request_payload = self._args_to_payload(catalog,
caller='query_catalog_async')
if get_query_payload:
return request_payload
response = self._request("POST", self.SIMBAD_URL, data=request_payload,
timeout=self.TIMEOUT, cache=cache)
return response
[docs] def query_bibobj(self, bibcode, verbose=False, get_query_payload=False):
"""
Query all the objects that are contained in the article specified by
the bibcode, and return results as a `~astropy.table.Table`.
Parameters
----------
bibcode : str
the bibcode of the article
get_query_payload : bool, optional
When set to `True` the method returns the HTTP request parameters.
Defaults to `False`.
Returns
-------
table : `~astropy.table.Table`
Query results table
"""
response = self.query_bibobj_async(bibcode,
get_query_payload=get_query_payload)
if get_query_payload:
return response
return self._parse_result(response, SimbadVOTableResult,
verbose=verbose)
[docs] def query_bibobj_async(self, bibcode, cache=True, get_query_payload=False):
"""
Serves the same function as `query_bibobj`, but only collects the
response from the Simbad server and returns.
Parameters
----------
bibcode : str
the bibcode of the article
get_query_payload : bool, optional
When set to `True` the method returns the HTTP request parameters.
Defaults to `False`.
Returns
-------
response : `requests.Response`
Response of the query from the server.
"""
request_payload = self._args_to_payload(bibcode, caller='query_bibobj_async')
if get_query_payload:
return request_payload
response = self._request("POST", self.SIMBAD_URL, data=request_payload,
timeout=self.TIMEOUT, cache=cache)
return response
[docs] def query_bibcode(self, bibcode, wildcard=False, verbose=False,
cache=True, get_query_payload=False):
"""
Queries the references corresponding to a given bibcode, and returns
the results in a `~astropy.table.Table`. Wildcards may be used to
specify bibcodes.
Parameters
----------
bibcode : str
the bibcode of the article
wildcard : boolean, optional
When it is set to `True` it implies that the object is specified
with wildcards. Defaults to `False`.
get_query_payload : bool, optional
When set to `True` the method returns the HTTP request parameters.
Defaults to `False`.
Returns
-------
table : `~astropy.table.Table`
Query results table
"""
response = self.query_bibcode_async(bibcode, wildcard=wildcard,
cache=cache,
get_query_payload=get_query_payload)
if get_query_payload:
return response
return self._parse_result(response, SimbadBibcodeResult,
verbose=verbose)
[docs] def query_bibcode_async(self, bibcode, wildcard=False, cache=True,
get_query_payload=False):
"""
Serves the same function as `query_bibcode`, but
only collects the response from the Simbad server and returns.
Parameters
----------
bibcode : str
the bibcode of the article
wildcard : boolean, optional
When it is set to `True` it implies that the object is specified
with wildcards. Defaults to `False`.
get_query_payload : bool, optional
When set to `True` the method returns the HTTP request parameters.
Defaults to `False`.
Returns
-------
response : `requests.Response`
Response of the query from the server.
"""
request_payload = self._args_to_payload(
bibcode, wildcard=wildcard,
caller='query_bibcode_async', get_raw=True)
if get_query_payload:
return request_payload
response = self._request("POST", self.SIMBAD_URL, cache=cache,
data=request_payload, timeout=self.TIMEOUT)
return response
[docs] def query_objectids(self, object_name, verbose=False, cache=True,
get_query_payload=False):
"""
Query Simbad with an object name, and return a table of all
names associated with that object in a `~astropy.table.Table`.
Parameters
----------
object_name : str
name of object to be queried
get_query_payload : bool, optional
When set to `True` the method returns the HTTP request parameters.
Defaults to `False`.
Returns
-------
table : `~astropy.table.Table`
Query results table
"""
response = self.query_objectids_async(object_name, cache=cache,
get_query_payload=get_query_payload)
if get_query_payload:
return response
return self._parse_result(response, SimbadObjectIDsResult,
verbose=verbose)
[docs] def query_objectids_async(self, object_name, cache=True,
get_query_payload=False):
"""
Serves the same function as `query_objectids`, but
only collects the response from the Simbad server and returns.
Parameters
----------
object_name : str
name of object to be queried
Returns
-------
response : `requests.Response`
Response of the query from the server.
"""
request_payload = dict(script="\n".join(('format object "%IDLIST"',
'query id %s' % object_name)))
if get_query_payload:
return request_payload
response = self._request("POST", self.SIMBAD_URL, data=request_payload,
timeout=self.TIMEOUT, cache=cache)
return response
def _get_query_header(self, get_raw=False):
votable_fields = ','.join(self.get_votable_fields())
# if get_raw is set then don't fetch as votable
if get_raw:
return ""
votable_def = "votable {" + votable_fields + "}"
votable_open = "votable open"
return "\n".join([votable_def, votable_open])
def _get_query_footer(self, get_raw=False):
if get_raw:
return ""
votable_close = "votable close"
return votable_close
@validate_epoch_decorator
@validate_equinox_decorator
def _args_to_payload(self, *args, **kwargs):
"""
Takes the arguments from any of the query functions and returns a
dictionary that can be used as the data for an HTTP POST request.
"""
script = ""
caller = kwargs['caller']
del kwargs['caller']
get_raw = kwargs.pop('get_raw', False)
command = self._function_to_command[caller]
votable_header = self._get_query_header(get_raw)
votable_footer = self._get_query_footer(get_raw)
if self.ROW_LIMIT > 0:
script = "set limit " + str(self.ROW_LIMIT)
script = "\n".join([script, votable_header, command])
using_wildcard = False
if kwargs.get('wildcard'):
# necessary to have a space at the beginning and end
script += " wildcard "
del kwargs['wildcard']
using_wildcard = True
# now append args and kwds as per the caller
# if caller is query_region_async write coordinates as separate ra dec
# rename equinox to equi as required by SIMBAD script
if kwargs.get('equinox'):
kwargs['equi'] = kwargs['equinox']
del kwargs['equinox']
# remove default None from kwargs
# be compatible with python3
for key in list(kwargs):
if not kwargs[key]:
del kwargs[key]
# join in the order specified otherwise results in error
all_keys = ['radius', 'frame', 'equi', 'epoch']
present_keys = [key for key in all_keys if key in kwargs]
if caller == 'query_criteria_async':
for k in kwargs:
present_keys.append(k)
# need ampersands to join args
args_str = '&'.join([str(val) for val in args])
if len(args) > 0 and len(present_keys) > 0:
args_str += " & "
else:
args_str = ' '.join([str(val) for val in args])
kwargs_str = ' '.join("{key}={value}".format(key=key,
value=kwargs[key])
for key in present_keys)
# For the record, I feel dirty for writing this wildcard-case hack.
# This entire function should be refactored when someone has time.
allargs_str = ' '.join([" ", args_str, kwargs_str, "\n"])
if using_wildcard:
allargs_str = allargs_str.lstrip()
script += allargs_str
script += votable_footer
return dict(script=script)
def _parse_result(self, result, resultclass=SimbadVOTableResult,
verbose=False):
"""
Instantiate a Simbad*Result class and try to parse the
response with the .table property/method, then return the
resulting table. If data is not retrieved or the resulting
table is empty, return None. In case of problems, save
intermediate results for further debugging.
"""
self.last_response = result
try:
content = result.content.decode('utf-8')
self.last_parsed_result = resultclass(content, verbose=verbose)
if self.last_parsed_result.data is None:
return None
resulttable = self.last_parsed_result.table
if len(resulttable) == 0:
return None
except Exception as ex:
self.last_table_parse_error = ex
try:
self._last_query.remove_cache_file(self.cache_location)
except OSError:
# this is allowed: if `cache` was set to False, this
# won't be needed
pass
raise TableParseError("Failed to parse SIMBAD result! The raw "
"response can be found in "
"self.last_response, and the error in "
"self.last_table_parse_error. The attempted"
" parsed result is in "
"self.last_parsed_result.\n "
"Exception: " + str(ex))
resulttable.errors = self.last_parsed_result.errors
return resulttable
def _parse_coordinates(coordinates):
try:
coordinates = commons.parse_coordinates(coordinates)
# now c has some subclass of astropy.coordinate
# get ra, dec and frame
return _get_frame_coords(coordinates)
except (u.UnitsError, TypeError):
raise ValueError("Coordinates not specified correctly")
def _has_length(x):
# some objects have '__len__' attributes but have no len()
try:
len(x)
return True
except (TypeError, AttributeError):
return False
def _get_frame_coords(coordinates):
if _has_length(coordinates):
# deal with vectors differently
parsed = [_get_frame_coords(cc) for cc in coordinates]
return ([ra for ra, dec, frame in parsed],
[dec for ra, dec, frame in parsed],
[frame for ra, dec, frame in parsed])
if coordinates.frame.name == 'icrs':
ra, dec = _to_simbad_format(coordinates.ra, coordinates.dec)
return (ra, dec, 'ICRS')
elif coordinates.frame.name == 'galactic':
lon, lat = (str(coordinates.l.degree), str(coordinates.b.degree))
if lat[0] not in ['+', '-']:
lat = '+' + lat
return (lon, lat, 'GAL')
elif coordinates.frame.name == 'fk4':
ra, dec = _to_simbad_format(coordinates.ra, coordinates.dec)
return (ra, dec, 'FK4')
elif coordinates.frame.name == 'fk5':
ra, dec = _to_simbad_format(coordinates.ra, coordinates.dec)
return (ra, dec, 'FK5')
else:
raise ValueError("%s is not a valid coordinate" % coordinates)
def _to_simbad_format(ra, dec):
# This irrelevantly raises the exception
# "AttributeError: Angle instance has no attribute 'hour'"
ra = ra.to_string(u.hour, sep=':')
dec = dec.to_string(u.degree, sep=':', alwayssign='True')
return (ra.lstrip(), dec.lstrip())
def _parse_radius(radius):
try:
angle = coord.Angle(radius)
# find the most appropriate unit - d, m or s
nonzero_indices = [i for (i, val) in enumerate(angle.dms)
if int(val) > 0]
if len(nonzero_indices) > 0:
index = min(nonzero_indices)
else:
index = 2 # use arcseconds when radius smaller than 1 arcsecond
unit = ('d', 'm', 's')[index]
if unit == 'd':
return str(angle.degree) + unit
if unit == 'm':
return str(angle.arcmin) + unit
if unit == 's':
return str(angle.arcsec) + unit
except (coord.errors.UnitsError, AttributeError):
raise ValueError("Radius specified incorrectly")
Simbad = SimbadClass()