Source code for biothings.web.services.metadata
import asyncio
import logging
from collections import defaultdict
from datetime import datetime
from functools import reduce
from operator import add
from dateutil.parser import parse as dtparse
from biothings.utils.common import get_loop
logger = logging.getLogger(__name__)
[docs]
class BiothingsMetadata:
    def __init__(self):
        # cached, generated from mappings
        self.biothing_metadata = defaultdict(dict)
        # {
        #     "variant": {
        #         "biothing_type": "variant",
        #         "build_date": "2020-05-08T17:33:59.756164",
        #         "build_version": "20200508",
        #         "src": {"cosmic": { ... }, ... },
        #         "stats": {"total": 928585791 } }
        #     "gene": { ... }
        # }
        self.biothing_mappings = defaultdict(dict)
        # {
        #     "variant": {
        #         'aeolus': {'properties': {'drug_code': {...}, ...}},
        #         'cadd': {'properties': {'1000g': {...}, 'alt': {...}, ...}},
        #         'cgi': {'properties': {'association': {...}, ...}},
        #         'chebi': {'properties': {'brand_names': {...}, ...}}, ... }
        #     "gene": { ... }
        # }
        self.biothing_licenses = defaultdict(dict)
        # {
        #     "variant": {
        #         'aeolus': 'http://bit.ly/2DIxWwF',
        #         'cadd': 'http://bit.ly/2TIuab9',
        #         'cgi': 'http://bit.ly/2FqS871',
        #         'chebi': 'http://bit.ly/2KAUCAm', ... }
        #     "gene": { ... }
        # }
[docs]
class BiothingsESMetadata(BiothingsMetadata):
    def __init__(self, indices, client):
        super().__init__()
        if not indices:
            # all indices on the host
            indices = {None: "_all"}
        if None not in indices:
            # default index pattern when no type specified
            indices[None] = next(iter(indices.values()))
        self.indices = indices
        self.client = client
        # initial refresh
        loop = get_loop()
        for btype in self.indices:
            obj = self.refresh(btype)
            if asyncio.iscoroutine(obj):
                try:  # py3.8+
                    task = loop.create_task(obj, name=str(btype))
                except TypeError:
                    task = loop.create_task(obj)
                task.add_done_callback(logger.debug)
    @property
    def types(self):  # biothing_type(s)
        return tuple(filter(None, self.indices.keys()))
[docs]
    def update(self, biothing_type, info, count):
        """
        Read ES index mappings for the corresponding biothing_type,
        Populate datasource info and field properties from mappings.
        """
        _type = biothing_type
        # try to resolve default to an equivalent
        # and concrete biothing_type (in meta) to display
        if _type is None:
            for type_, pattern in self.indices.items():
                if self.indices[None] == pattern:
                    _type = type_
                    break
        reader = _BiothingsESMetadataReader(_type, info, count)
        self.biothing_metadata[biothing_type] = reader.get_metadata()
        self.biothing_mappings[biothing_type] = reader.get_mappings()
        self.biothing_licenses[biothing_type] = reader.get_licenses()
[docs]
    def refresh(self, biothing_type=None):
        from elasticsearch import AsyncElasticsearch, Elasticsearch
        if isinstance(self.client, Elasticsearch):
            return self._refresh(biothing_type)
        elif isinstance(self.client, AsyncElasticsearch):
            return self._async_refresh(biothing_type)
    def _refresh(self, biothing_type):
        index = self.indices[biothing_type]
        info = self.client.indices.get(index=index)
        count = self.client.count(index=index)
        self.update(biothing_type, info, count)
        return info
    async def _async_refresh(self, biothing_type):
        index = self.indices[biothing_type]
        info = await self.client.indices.get(index=index)
        count = await self.client.count(index=index)
        self.update(biothing_type, info, count)
        return info
[docs]
class BiothingsMongoMetadata(BiothingsMetadata):
    def __init__(self, collections, client):
        super().__init__()
        self.collections = collections
        self.client = client
    @property
    def types(self):  # biothing_type(s)
        return tuple(filter(None, self.collections.keys()))
[docs]
    async def refresh(self, biothing_type):
        collection = self.client[self.collections[biothing_type]]
        # https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html
        # #pymongo.collection.Collection.estimated_document_count
        self.biothing_metadata[biothing_type] = BiothingHubMeta(
            biothing_type=biothing_type, stats=dict(total=collection.estimated_document_count())
        ).to_dict()
[docs]
    def get_mappings(self, biothing_type):
        # document database does not have data schema
        # however, it might be possible to extract those indexed fields
        # https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html
        # #pymongo.collection.Collection.list_indexes
        return {"__N/A__": True}
[docs]
class BiothingsSQLMetadata(BiothingsMetadata):
    def __init__(self, tables, client):
        super().__init__()
        self.tables = tables
        self.client = client
    @property
    def types(self):  # biothing_type(s)
        return tuple(filter(None, self.tables.keys()))
[docs]
    async def refresh(self, biothing_type):
        # https://docs.sqlalchemy.org/en/14/core/reflection.html
        # This is a temporary solution as a proof of concept.
        # The implementation should probably be refined.
        # It doesn't work with empty tables at this point.
        table = self.tables[biothing_type]
        cursor = self.client.execute(f"SELECT * FROM {table}")
        if cursor.returns_rows:
            self.biothing_mappings[biothing_type] = {
                key: {"type": type(val).__name__} for key, val in zip(cursor.keys(), cursor.fetchone())
            }
            self.biothing_metadata[biothing_type] = BiothingHubMeta(
                biothing_type=biothing_type, stats=dict(total=cursor.rowcount)
            ).to_dict()
class _BiothingsESMetadataReader:
    """
    Read http://<elasticsearch>/<index_pattern>/ and ./_stats
    If the pattern matches one index, then that index's info will be used.
    If the pattern matches multiple indices, then the results will be combined.
    If the pattern matches no index, then empty dictionaries are returned.
    """
    def __init__(self, biothing_type, info, count):
        self.biothing_type = biothing_type
        self.document_count = count
        self.indices_info = {index: _ESIndex(biothing_type, **index_info) for index, index_info in info.items()}
    def get_mappings(self):
        """
        Mapping properties used for metadata field endpoint. For example:
        {
            'aeolus': {'properties': {'drug_code': {...}, ...}},
            'cadd': {'properties': {'1000g': {...}, 'alt': {...}, ...}},
            'cgi': {'properties': {'association': {...}, ...}},
            'chebi': {'properties': {'brand_names': {...}, ...}},
            ...
        }
        """
        mappings = list(info.get_mappings() for info in self.indices_info.values())
        mappings = reduce(add, mappings).to_dict() if mappings else {}
        return mappings
    def get_licenses(self):
        """
        Source-URL pairs that contains the data licencing information. Example:
        {
            'aeolus': 'http://bit.ly/2DIxWwF',
            'cadd': 'http://bit.ly/2TIuab9',
            'cgi': 'http://bit.ly/2FqS871',
            'chebi': 'http://bit.ly/2KAUCAm',
            ...
        }
        """
        licenses = list(info.get_licenses() for info in self.indices_info.values())
        licenses = reduce(add, licenses).to_dict() if licenses else {}
        return licenses
    def get_metadata(self):
        """
        Provide description about the data under this type. Example:
        {
            "biothing_type": "variant",
            "build_date": "2020-05-08T17:33:59.756164",
            "build_version": "20200508",
            "src": {"cosmic": { ... }, ... },
            "stats": {"total": 928585791 }
        }
        """
        metadata = list(info.get_metadata() for info in self.indices_info.values())
        metadata = reduce(add, metadata).to_dict() if metadata else {}
        if metadata:
            if metadata.get("biothing_type") == "__multiple__":
                metadata["biothing_type"] = self.biothing_type
            metadata["stats"]["total"] = self.document_count["count"]
        metadata["_biothing"] = self.biothing_type
        metadata["_indices"] = list(self.indices_info.keys())
        return metadata
class _ESIndex:
    """
    Read one index's info http://<elasticsearch>/<index>.
    Return combinable BiothingMetaProp objects.
    """
    def __init__(self, biothing, aliases, mappings, settings):
        self.biothing = biothing
        self.aliases = aliases
        self.mappings = _ESIndexMappings(mappings)
        self.settings = _ESIndexSettings(settings)
    def get_metadata(self):
        """
        Return BiothingHubMetadata instance.
        Populate empty metadata basing on index settings.
        Fill in empty stats field if not provided.
        """
        if self.mappings.metadata:
            try:
                return BiothingHubMeta(**self.mappings.metadata)
            except KeyError:
                pass
        return BiothingHubMeta(
            biothing_type=self.biothing,
            build_date=self.settings.get_creation_date().isoformat(),
            build_version=self.settings.get_index_version(),
            src={},
            stats={},
        )
    def get_licenses(self):
        return BiothingLicenses(self.mappings.extract_licenses())
    def get_mappings(self):
        return BiothingMappings(self.mappings.properties)
class _ESIndexSettings:
    """
    Object representation of ES index settings.
    {
        "index": {
            "number_of_shards": "1",
            "auto_expand_replicas": "0-1",
            "provided_name": ".tasks",
            "creation_date": "1566293197607",
            "priority": "2147483647",
            "number_of_replicas": "0",
            "uuid": "yWBk0qw0QXmEuxJFas3mIg",
            "version": {
                "created": "6050099"
            }
        }
    }
    """
    def __init__(self, setting):
        self.index = setting["index"]
    def get_creation_date(self):
        return datetime.fromtimestamp(int(self.index["creation_date"]) / 1000)
    def get_index_version(self):
        if "updated" in self.index["version"]:
            return self.index["version"]["updated"]
        return self.index["version"]["created"]
class _ESIndexMappings:
    """
    Object representation of ES index mappings:
    {
        # this level is only available for es6
        "<doc_type> : {
            'properties': { ... },  ---> mapping
            '_meta': {
                "src" : { ... }     ---> licenses
                ...
            },              -----------> metadata
            ...
        }
    }
    """
    def __init__(self, mapping):
        # for elasticsearch version 6.x
        if len(mapping) == 1 and next(iter(mapping)) != "properties":
            # remove doc_type, support 1 type per index
            # mapping in ES6 is nested under doc_type
            # we will try to take the mapping dict under the first doc_type
            _mapping = next(iter(mapping.values()))
            if isinstance(_mapping, dict) and "properties" in _mapping:
                mapping = _mapping
        self.enabled = mapping.pop("enabled", True)
        self.dynamic = mapping.pop("dynamic", True)
        self.properties = mapping.get("properties", {})
        self.metadata = mapping.get("_meta", {})
    def extract_licenses(self):
        """
        Return source name - license url pairs.
        """
        licenses = {}
        for src, info in self.metadata.get("src", {}).items():
            if "license_url_short" in info:
                licenses[src] = info["license_url_short"]
            elif "license_url" in info:
                licenses[src] = info["license_url"]
        return licenses
[docs]
class BiothingMetaProp:
    def __add__(self, other):
        raise NotImplementedError
        # NOTE
        # The current implementations below
        # may not be able to properly handle
        # field (key/source) collisions.
[docs]
class BiothingLicenses(BiothingMetaProp):
    def __init__(self, licenses):
        self.licenses = licenses
    def __add__(self, other):
        licenses = dict(self.licenses)
        licenses.update(other.licenses)
        return BiothingLicenses(licenses)
[docs]
class BiothingMappings(BiothingMetaProp):
    def __init__(self, properties):
        self.properties = properties
    def __add__(self, other):
        mappings = dict(self.properties)
        mappings.update(other.properties)
        return BiothingMappings(mappings)
[docs]
class BiothingHubMeta(BiothingMetaProp):
    def __init__(self, **metadata):  # dict
        self.biothing_type = metadata.get("biothing_type")
        # self.build_date = datetime.fromisoformat(metadata['build_date']) # python3.7 syntax
        # self.build_date = datetime.strptime(metadata['build_date'], "%Y-%m-%dT%H:%M:%S.%f")
        self.build_date = metadata.get("build_date")
        self.build_version = metadata.get("build_version")
        self.src = metadata.get("src", {})
        self.stats = metadata.get("stats", {})
        if self.build_date and isinstance(self.build_date, str):
            self.build_date = dtparse(metadata["build_date"]).astimezone()
[docs]
    def to_dict(self):
        return {
            "biothing_type": self.biothing_type,
            "build_date": self.build_date.isoformat() if isinstance(self.build_date, datetime) else self.build_date,
            "build_version": self.build_version,
            "src": self.src,
            "stats": self.stats,
        }
    def __add__(self, other):
        # combine biothing_type field
        biothing_type = self.biothing_type
        if other.biothing_type != self.biothing_type:
            biothing_type = "__multiple__"
        # take the latest build_date
        # TODO if one of them is None
        build_date = self.build_date
        if other.build_date and (not build_date or other.build_date > build_date):
            # set to other.build_data if it's newer or current build_date is None
            build_date = other.build_date
        # combine build_version field
        build_version = self.build_version
        if other.build_version != build_version:
            build_version = "__multiple__"
        # combine source field
        src = dict(self.src)
        src.update(other.src)
        # add up stats field
        stats = dict(self.stats)
        for key, value in other.stats.items():
            if key in stats:
                if isinstance(stats[key], (int, float)) and isinstance(value, (int, float)):
                    stats[key] += value
            else:  # new key
                stats[key] = value
        return BiothingHubMeta(
            biothing_type=biothing_type,
            build_date=build_date.isoformat() if isinstance(build_date, datetime) else build_date,
            build_version=build_version,
            src=src,
            stats=stats,
        )