# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Common utilities for accessing VO simple services.
.. note::
Some functions are not used by Astroquery but kept for
backward-compatibility with ``astropy.vo.client``.
"""
import fnmatch
import json
import os
import re
import socket
import urllib
import warnings
from collections import defaultdict
from copy import deepcopy
from astropy.io.votable import parse_single_table, table, tree
from astropy.io.votable.exceptions import vo_raise, vo_warn, E19, W24, W25
from astropy.utils.console import color_print
from astropy.utils.data import get_readable_fileobj
from astropy.utils.data import conf as data_conf
from astropy.utils.exceptions import AstropyUserWarning
from astropy.utils.misc import JsonCustomEncoder
from astropy.utils.xml.unescaper import unescape_all
from .exceptions import (VOSError, MissingCatalog, DuplicateCatalogName,
DuplicateCatalogURL, InvalidAccessURL)
from ..utils.url_helpers import urljoin_keep_path
# Import configurable items declared in __init__.py
from . import conf
__all__ = ['VOSBase', 'VOSCatalog', 'VOSDatabase', 'get_remote_catalog_db',
'call_vo_service', 'list_catalogs']
__dbversion__ = 1
[docs]class VOSBase:
"""
Base class for VO catalog and database.
Parameters
----------
tree : JSON tree
"""
def __init__(self, tree):
self._tree = tree
def __getattr__(self, what):
"""Expose dictionary attributes."""
return getattr(self._tree, what)
def __getitem__(self, what):
"""Expose dictionary key look-up."""
return self._tree[what]
def __setitem__(self, what, value):
"""Expose dictionary key assignment."""
self._tree[what] = value
def __iter__(self):
"""Expose dictionary iteration."""
return iter(self._tree)
[docs] def dumps(self):
"""
Dump the contents into a string.
Returns
-------
s : str
Contents as JSON string dump.
"""
return json.dumps(self._tree, cls=JsonCustomEncoder, sort_keys=True,
indent=4)
[docs]class VOSCatalog(VOSBase):
"""
A class to represent VO Service Catalog.
Parameters
----------
tree : JSON tree
Raises
------
VOSError
Missing necessary key(s).
"""
_compulsory_keys = ('title', 'url')
def __init__(self, tree):
super(VOSCatalog, self).__init__(tree)
for key in self._compulsory_keys:
if key not in self._tree:
raise VOSError('Catalog must have "{0}" key.'.format(key))
def __str__(self): # pragma: no cover
"""Show the most important and unique things about a catalog."""
out_str = '\n'.join(['{0}: {1}'.format(key, self._tree[key])
for key in self._compulsory_keys
if key in self._tree])
return out_str
[docs] def delete_attribute(self, key):
"""
Delete given metadata key and its value from the catalog.
Parameters
----------
key : str
Metadata key to delete.
Raises
------
KeyError
Key not found.
VOSError
Key must exist in catalog, therefore cannot be deleted.
"""
if key in self._compulsory_keys:
raise VOSError('{0} must exist in catalog, therefore cannot be '
'deleted.'.format(key))
del self._tree[key]
[docs] @classmethod
def create(cls, title, url, **kwargs):
"""
Create a new VO Service Catalog with user parameters.
Parameters
----------
title : str
Title of the catalog.
url : str
Access URL of the service. This is used to build queries.
kwargs : dict
Additional metadata as keyword-value pairs describing the catalog,
except 'title' and 'url'.
Returns
-------
cat : `VOSCatalog`
VO Service Catalog.
Raises
------
TypeError
Multiple values given for keyword argument.
"""
tree = {'title': title, 'url': url}
tree.update(kwargs)
return cls(tree)
[docs]class VOSDatabase(VOSBase):
"""
A class to represent a collection of :class:`VOSCatalog`.
Parameters
----------
tree : JSON tree
Raises
------
VOSError
If given ``tree`` does not have 'catalogs' key
or catalog is invalid.
"""
def __init__(self, tree):
if 'catalogs' not in tree:
raise VOSError("Invalid VO service catalog database")
super(VOSDatabase, self).__init__(tree)
self._catalogs = tree['catalogs']
if self.version > __dbversion__: # pragma: no cover
vo_warn(W24)
# Maps access URL to primary key(s).
# URL is the real key, but we chose title because it is more readable
# when written out to JSON.
self._url_keys = defaultdict(list)
for key, cat in self.get_catalogs():
self._url_keys[cat['url']].append(key)
def __str__(self): # pragma: no cover
"""Show the most important and unique things about a database."""
return '\n'.join(sorted(self._catalogs))
def __len__(self):
"""Return the number of catalogs in database."""
return len(self._catalogs)
@property
def version(self):
"""Database version number."""
return self._tree['__version__']
[docs] def get_catalogs(self):
"""Iterator to get all catalogs."""
for key, val in self._catalogs.items():
yield key, VOSCatalog(val)
[docs] def get_catalogs_by_url(self, url):
"""Like :meth:`get_catalogs` but using access URL look-up."""
keys = self._url_keys[url]
for key in keys:
yield key, VOSCatalog(self._catalogs[key])
[docs] def get_catalog(self, name):
"""
Get one catalog of given name.
Parameters
----------
name : str
Primary key identifying the catalog.
Returns
-------
obj : `VOSCatalog`
Raises
------
MissingCatalog
If catalog is not found.
"""
if name not in self._catalogs:
raise MissingCatalog("No catalog '{0}' found.".format(name))
return VOSCatalog(self._catalogs[name])
[docs] def get_catalog_by_url(self, url):
"""
Like :meth:`get_catalog` but using access URL look-up.
On multiple matches, only first match is returned.
"""
keys = self._url_keys[url]
if len(keys) < 1:
raise MissingCatalog(
"No catalog with URL '{0}' found.".format(url))
return VOSCatalog(self._catalogs[keys[0]])
@staticmethod
def _match_pattern(all_keys, pattern, sort):
"""Used by :meth:`list_catalogs` and :meth:`list_catalogs_by_url`."""
if pattern is None or len(all_keys) == 0:
out_arr = all_keys
else:
pattern = re.compile(fnmatch.translate('*' + pattern + '*'),
re.IGNORECASE)
out_arr = [s for s in all_keys if pattern.match(s)]
if sort:
out_arr.sort()
return out_arr
[docs] def list_catalogs(self, pattern=None, sort=True):
"""
List catalog names.
Parameters
----------
pattern : str or `None`
If given string is anywhere in a catalog name, it is
considered a matching catalog. It accepts patterns as
in :py:mod:`fnmatch` and is case-insensitive.
By default, all catalogs are returned.
sort : bool
Sort output in alphabetical order. If not sorted, the
order depends on dictionary hashing. Default is `True`.
Returns
-------
out_arr : list of str
List of catalog names.
"""
return self._match_pattern(list(self._catalogs), pattern, sort)
[docs] def list_catalogs_by_url(self, pattern=None, sort=True):
"""Like :meth:`list_catalogs` but using access URL."""
out_arr = self._match_pattern(list(self._url_keys), pattern, sort)
# Discard URL that maps to nothing
return [a for a in out_arr if len(self._url_keys[a]) > 0]
[docs] def add_catalog(self, name, cat, allow_duplicate_url=False):
"""
Add a catalog to database.
Parameters
----------
name : str
Primary key for the catalog.
cat : `VOSCatalog`
Catalog to add.
allow_duplicate_url : bool
Allow catalog with duplicate access URL?
Raises
------
VOSError
Invalid catalog.
DuplicateCatalogName
Catalog with given name already exists.
DuplicateCatalogURL
Catalog with given access URL already exists.
"""
if not isinstance(cat, VOSCatalog):
raise VOSError('{0} is not a VO Service Catalog.'.format(cat))
if name in self._catalogs:
raise DuplicateCatalogName('{0} already exists.'.format(name))
url = cat['url']
names = self._url_keys[url]
if len(names) > 0 and not allow_duplicate_url:
raise DuplicateCatalogURL(
'{0} already exists: {1}'.format(url, names))
self._catalogs[name] = deepcopy(cat._tree)
self._url_keys[url].append(name)
[docs] def add_catalog_by_url(self, name, url, **kwargs):
"""
Like :meth:`add_catalog` but the catalog is created with
only the given name and access URL.
Parameters
----------
name : str
Primary key for the catalog.
url : str
Access URL of the service. This is used to build queries.
kwargs : dict
Keywords accepted by :meth:`add_catalog`.
"""
self.add_catalog(name, VOSCatalog.create(name, url), **kwargs)
[docs] def delete_catalog(self, name):
"""
Delete a catalog from database with given name.
Parameters
----------
name : str
Primary key identifying the catalog.
Raises
------
MissingCatalog
If catalog is not found.
"""
if name not in self._catalogs:
raise MissingCatalog('{0} not found.'.format(name))
self._url_keys[self._catalogs[name]['url']].remove(name)
del self._catalogs[name]
[docs] def delete_catalog_by_url(self, url):
"""
Like :meth:`delete_catalog` but using access URL.
On multiple matches, all matches are deleted.
"""
keys = sorted(self._url_keys[url]) # Makes a copy of list
if len(keys) < 1:
raise MissingCatalog('{0} not found.'.format(url))
for key in keys:
self.delete_catalog(key)
[docs] def merge(self, other, **kwargs):
"""
Merge two database together.
Parameters
----------
other : `VOSDatabase`
The other database to merge.
kwargs : dict
Keywords accepted by :meth:`add_catalog`.
Returns
-------
db : `VOSDatabase`
Merged database.
Raises
------
VOSError
Invalid database or incompatible version.
"""
if not isinstance(other, VOSDatabase):
raise VOSError('{0} is not a VO database.'.format(other))
if other.version != self.version:
raise VOSError('Incompatible database version: {0}, '
'{1}'.format(self.version, other.version))
db = VOSDatabase.create_empty()
for old_db in (self, other):
for key, cat in old_db.get_catalogs():
db.add_catalog(key, cat, **kwargs)
return db
[docs] def to_json(self, filename, overwrite=False):
"""
Write database content to a JSON file.
Parameters
----------
filename : str
JSON file.
overwrite : bool
If `True`, overwrite the output file if it exists.
Raises
------
OSError
If the file exists and ``overwrite`` is `False`.
"""
if os.path.exists(filename) and not overwrite:
raise OSError('{0} exists.'.format(filename))
with open(filename, 'w') as fd:
fd.write(self.dumps())
[docs] @classmethod
def create_empty(cls):
"""
Create an empty database of VO services.
Empty database format::
{
"__version__": 1,
"catalogs" : {
}
}
Returns
-------
db : `VOSDatabase`
Empty database.
"""
return cls({'__version__': __dbversion__, 'catalogs': {}})
[docs] @classmethod
def from_json(cls, filename, **kwargs):
"""
Create a database of VO services from a JSON file.
Example JSON format for Cone Search::
{
"__version__": 1,
"catalogs" : {
"My Cone Search": {
"capabilityClass": "ConeSearch",
"title": "My Cone Search",
"url": "http://foo/cgi-bin/search?CAT=bar&",
...
},
"Another Cone Search": {
...
}
}
}
Parameters
----------
filename : str
JSON file.
kwargs : dict
Keywords accepted by
:func:`~astropy.utils.data.get_readable_fileobj`.
Returns
-------
db : `VOSDatabase`
Database from given file.
"""
with get_readable_fileobj(filename, **kwargs) as fd:
tree = json.load(fd)
return cls(tree)
[docs] @classmethod
def from_registry(cls, registry_url, timeout=60, **kwargs):
"""
Create a database of VO services from VO registry URL.
This is described in detail in :ref:`vo-sec-validator-build-db`,
except for the ``validate_xxx`` keys that are added by the
validator itself.
Parameters
----------
registry_url : str
URL of VO registry that returns a VO Table.
For example, see
``astroquery.vo_conesearch.validator.conf.cs_mstr_list``.
Pedantic is automatically set to `False` for parsing.
timeout : number
Temporarily set ``astropy.utils.data.conf.remote_timeout``
to this value to avoid time out error while reading the
entire registry.
kwargs : dict
Keywords accepted by
:func:`~astropy.utils.data.get_readable_fileobj`.
Returns
-------
db : `VOSDatabase`
Database from given registry.
Raises
------
VOSError
Invalid VO registry.
"""
# Download registry as VO table
with data_conf.set_temp('remote_timeout', timeout):
with get_readable_fileobj(registry_url, **kwargs) as fd:
tab_all = parse_single_table(fd, verify='warn')
# Registry must have these fields
compulsory_fields = ['res_title', 'access_url']
cat_fields = tab_all.array.dtype.names
for field in compulsory_fields:
if field not in cat_fields: # pragma: no cover
raise VOSError('"{0}" is missing from registry.'.format(field))
title_counter = defaultdict(int)
title_fmt = '{0} {1}'
db = cls.create_empty()
# Each row in the table becomes a catalog
for arr in tab_all.array.data:
cur_cat = {}
cur_key = ''
# Process each field and build the catalog.
# Catalog is completely built before being thrown out
# because codes need less changes should we decide to
# allow duplicate URLs in the future.
for field in cat_fields:
# For primary key, a number needs to be appended to the title
# because registry can have multiple entries with the same
# title but different URLs.
if field == 'res_title':
cur_title = arr['res_title']
title_counter[cur_title] += 1 # Starts with 1
if isinstance(cur_title, bytes): # ASTROPY_LT_4_1
cur_key = title_fmt.format(cur_title.decode('utf-8'),
title_counter[cur_title])
else:
cur_key = title_fmt.format(cur_title,
title_counter[cur_title])
# Special handling of title and access URL,
# otherwise no change.
if field == 'access_url':
s = unescape_all(arr['access_url'])
if isinstance(s, bytes): # ASTROPY_LT_4_1
s = s.decode('utf-8')
cur_cat['url'] = s
elif field == 'res_title':
cur_cat['title'] = arr[field]
else:
cur_cat[field] = arr[field]
# New field to track duplicate access URLs.
cur_cat['duplicatesIgnored'] = 0
# Add catalog to database, unless duplicate access URL exists.
# In that case, the entry is thrown out and the associated
# counter is updated.
dup_keys = db._url_keys[cur_cat['url']]
if len(dup_keys) < 1:
db.add_catalog(
cur_key, VOSCatalog(cur_cat), allow_duplicate_url=False)
else:
db._catalogs[dup_keys[0]]['duplicatesIgnored'] += 1
warnings.warn(
'{0} is thrown out because it has same access URL as '
'{1}.'.format(cur_key, dup_keys[0]), AstropyUserWarning)
return db
[docs]def get_remote_catalog_db(dbname, cache=True, verbose=True):
"""
Get a database of VO services (which is a JSON file) from a remote
location.
Parameters
----------
dbname : str
Prefix of JSON file to download from
``astroquery.vo_conesearch.conf.vos_baseurl``.
cache : bool
Use caching for VO Service database. Access to actual VO
websites referenced by the database still needs internet
connection.
verbose : bool
Show download progress bars.
Returns
-------
db : `VOSDatabase`
A database of VO services.
"""
return VOSDatabase.from_json(
urljoin_keep_path(conf.vos_baseurl, dbname + '.json'),
encoding='utf8', cache=cache, show_progress=verbose)
def _get_catalogs(service_type, catalog_db, **kwargs):
"""
Expand ``catalog_db`` to a list of catalogs.
Parameters
----------
service_type, catalog_db
See :func:`call_vo_service`.
kwargs : dict
Keywords accepted by :func:`get_remote_catalog_db`.
Returns
-------
catalogs : list of tuple
List of catalogs in the form of ``(key, VOSCatalog)``.
Raises
------
VOSError
Invalid ``catalog_db``.
"""
if catalog_db is None:
catalog_db = get_remote_catalog_db(service_type, **kwargs)
catalogs = catalog_db.get_catalogs()
elif isinstance(catalog_db, VOSDatabase):
catalogs = catalog_db.get_catalogs()
elif isinstance(catalog_db, (VOSCatalog, str)):
catalogs = [(None, catalog_db)]
elif isinstance(catalog_db, list):
for x in catalog_db:
assert (isinstance(x, (VOSCatalog, str))
and not isinstance(x, VOSDatabase))
catalogs = [(None, x) for x in catalog_db]
else: # pragma: no cover
raise VOSError('catalog_db must be a catalog database, '
'a list of catalogs, or a catalog')
return catalogs
def _vo_service_request(url, pedantic, kwargs, cache=True, verbose=False):
"""
This is called by :func:`call_vo_service`.
Raises
------
InvalidAccessURL
Invalid access URL.
"""
if len(kwargs) and not url.endswith(('?', '&')):
raise InvalidAccessURL("url should already end with '?' or '&'")
query = []
for key, value in kwargs.items():
query.append('{0}={1}'.format(
urllib.parse.quote(key), urllib.parse.quote_plus(str(value))))
parsed_url = url + '&'.join(query)
with get_readable_fileobj(parsed_url, encoding='binary', cache=cache,
show_progress=verbose) as req:
pedantic = 'exception' if pedantic else 'warn'
tab = table.parse(req, filename=parsed_url, verify=pedantic)
return vo_tab_parse(tab, url, kwargs)
def vo_tab_parse(tab, url, kwargs):
"""
In case of errors from the server, a complete and correct
'stub' VOTable file may still be returned.
This is to detect that case.
Parameters
----------
tab : `astropy.io.votable.tree.VOTableFile`
url : str
URL used to obtain ``tab``.
kwargs : dict
Keywords used to obtain ``tab``, if any.
Returns
-------
out_tab : `astropy.io.votable.tree.Table`
Raises
------
IndexError
Table iterator fails.
VOSError
Server returns error message or invalid table.
"""
for param in tab.iter_fields_and_params():
if param.ID is not None and param.ID.lower() == 'error':
if isinstance(param, tree.Param):
e = param.value
else: # pragma: no cover
e = ''
raise VOSError("Catalog server '{0}' returned error '{1}'".format(
url, e))
for info in tab.infos:
if info.name is not None and info.name.lower() == 'error':
raise VOSError("Catalog server '{0}' returned error '{1}'".format(
url, info.value))
if tab.resources == []: # pragma: no cover
vo_raise(E19)
for info in tab.resources[0].infos:
if ((info.name == 'QUERY_STATUS' and info.value != 'OK')
or (info.name is not None and info.name.lower() == 'error')):
if info.content is not None: # pragma: no cover
long_descr = ':\n{0}'.format(info.content)
else:
long_descr = ''
raise VOSError("Catalog server '{0}' returned status "
"'{1}'{2}".format(url, info.value, long_descr))
out_tab = tab.get_first_table()
kw_sr = [k for k in kwargs if 'sr' == k.lower()]
if len(kw_sr) == 0:
sr = 0
else:
sr = kwargs.get(kw_sr[0])
if sr != 0 and out_tab.array.size <= 0:
raise VOSError("Catalog server '{0}' returned {1} result".format(
url, out_tab.array.size))
out_tab.url = url # Track the URL
return out_tab
[docs]def call_vo_service(service_type, catalog_db=None, pedantic=None,
verbose=True, cache=True, kwargs={}):
"""
Makes a generic VO service call.
Parameters
----------
service_type : str
Name of the type of service, e.g., 'conesearch_good'.
Used in error messages and to select a catalog database
if ``catalog_db`` is not provided.
catalog_db
May be one of the following, in order from easiest to
use to most control:
- `None`: A database of ``service_type`` catalogs is
downloaded from ``astroquery.vo_conesearch.conf.vos_baseurl``.
The first catalog in the database to successfully return a
result is used.
- *catalog name*: A name in the database of
``service_type`` catalogs at
``astroquery.vo_conesearch.conf.vos_baseurl`` is used.
For a list of acceptable names, use :func:`list_catalogs`.
- *url*: The prefix of a URL to a IVOA Service for
``service_type``. Must end in either '?' or '&'.
- :class:`VOSCatalog` object: A specific catalog manually
downloaded and selected from the database
(see :ref:`vo-sec-client-vos`).
- Any of the above 3 options combined in a list, in which case
they are tried in order.
pedantic : bool or `None`
When `True`, raise an error when the file violates the spec,
otherwise issue a warning. Warnings may be controlled using
:py:mod:`warnings` module. When not provided, uses the
configuration setting ``astroquery.vo_conesearch.conf.pedantic``,
which defaults to `False`.
verbose : bool
Verbose output.
cache : bool
Use caching for VO Service database. Access to actual VO
websites referenced by the database still needs internet
connection.
kwargs : dictionary
Keyword arguments to pass to the catalog service.
No checking is done that the arguments are accepted by
the service, etc.
Returns
-------
obj : `astropy.io.votable.tree.Table`
First table from first successful VO service request.
Raises
------
VOSError
If VO service request fails.
"""
n_timed_out = 0
catalogs = _get_catalogs(service_type, catalog_db, cache=cache,
verbose=verbose)
if pedantic is None: # pragma: no cover
pedantic = conf.pedantic
for name, catalog in catalogs:
if isinstance(catalog, str):
if catalog.startswith('http'):
url = catalog
else:
remote_db = get_remote_catalog_db(service_type, cache=cache,
verbose=verbose)
catalog = remote_db.get_catalog(catalog)
url = catalog['url']
else:
url = catalog['url']
if verbose: # pragma: no cover
color_print('Trying {0}'.format(url), 'green')
try:
return _vo_service_request(url, pedantic, kwargs, cache=cache,
verbose=verbose)
except Exception as e:
vo_warn(W25, (url, str(e)))
if hasattr(e, 'reason') and isinstance(e.reason, socket.timeout):
n_timed_out += 1
err_msg = 'None of the available catalogs returned valid results.'
if n_timed_out > 0:
err_msg += ' ({0} URL(s) timed out.)'.format(n_timed_out)
raise VOSError(err_msg)
[docs]def list_catalogs(service_type, cache=True, verbose=True, **kwargs):
"""List the catalogs available for the given service type.
Parameters
----------
service_type : str
Name of the type of service, e.g., 'conesearch_good'.
cache : bool
Use caching for VO Service database. Access to actual VO
websites referenced by the database still needs internet
connection.
verbose : bool
Show download progress bars.
pattern : str or `None`
If given string is anywhere in a catalog name, it is
considered a matching catalog. It accepts patterns as
in :py:mod:`fnmatch` and is case-insensitive.
By default, all catalogs are returned.
sort : bool
Sort output in alphabetical order. If not sorted, the
order depends on dictionary hashing. Default is `True`.
Returns
-------
arr : list of str
List of catalog names.
"""
return get_remote_catalog_db(service_type, cache=cache,
verbose=verbose).list_catalogs(**kwargs)