Source code for biothings.web.query.formatter
"""
    Search Result Formatter
    Transform the raw query result into consumption-friendly
    structures by possibly removing from, adding to, and/or
    flattening the raw response from the database engine for
    one or more individual queries.
"""
from collections import UserDict, defaultdict
from biothings.utils.common import dotdict, traverse
from biothings.utils.jmespath import options as jmp_options
[docs]
class FormatterDict(UserDict):
[docs]
    def wrap(self, key, kls):
        if isinstance(self.get(key), list):
            self[key] = [kls(x) for x in self[key]]
        else:
            self[key] = kls(self[key])
[docs]
class ESResultFormatter(ResultFormatter):
    """Class to transform the results of the Elasticsearch query generated prior in the pipeline.
    This contains the functions to extract the final document from the elasticsearch query result
    in `Elasticsearch Query`_.  This also contains the code to flatten a document etc.
    """
    class _Hits(Hits):
        def __init__(self, *args, **kwargs):
            super().__init__(*args, **kwargs)
            # make sure the document is coming from
            # elasticsearch at initialization time
            assert "hits" in self.data
            assert "total" in self.data["hits"]
            assert "hits" in self.data["hits"]
            for hit in self.data["hits"]["hits"]:
                assert "_source" in hit
    class _Doc(Doc):
        pass
    def __init__(self, licenses=None, license_transform=None, field_notes=None, excluded_keys=()):
        # do not directly use the "or" syntax
        # to turn a None object to a {}, that
        # would also replace external empty {}
        # that will later get populated with
        # contents when data is available.
        if licenses is None:
            licenses = {}
        if license_transform is None:
            license_transform = {}
        if field_notes is None:
            field_notes = {}
        # license settings
        # ---------------------
        # this feature is turned on by default,
        # it appends license fields to the documents
        # by looking at its first level field names
        # or the transformed field name equivalences.
        self.licenses = licenses
        # example:
        # {
        #     "exac": "http://example.com/licenseA",
        #     "snpeff": "http://example.com/licenseB"
        # }
        self.license_transform = license_transform
        # example:
        # {
        #     "exac_nontcga": "exac",
        #     "snpeff.ann": "snpeff"
        # }
        # mapping dispaly settings
        # -------------------------
        self.field_notes = field_notes
        self.excluded_keys = excluded_keys
    # for compatibility
    traverse = staticmethod(traverse)
[docs]
    def transform(self, response, **options):
        """
        Transform the query response to a user-friendly structure.
        Mainly deconstruct the elasticsearch response structure and
        hand over to transform_doc to apply the options below.
        Options:
            # generic transformations for dictionaries
            # ------------------------------------------
            dotfield: flatten a dictionary using dotfield notation
            _sorted: sort keys alaphabetically in ascending order
            always_list: ensure the fields specified are lists or wrapped in a list
            allow_null: ensure the fields specified are present in the result,
                        the fields may be provided as type None or [].
            # additional multisearch result transformations
            # ------------------------------------------------
            template: base dict for every result, for example: {"success": true}
            templates: a different base for every result, replaces the setting above
            template_hit: a dict to update every positive hit result, default: {"found": true}
            template_miss: a dict to update every query with no hit, default: {"found": false}
            # document format and content management
            # ---------------------------------------
            biothing_type: result document type to apply customized transformation.
                        for example, add license field basing on document type's metadata.
            one: return the individual document if there's only one hit. ignore this setting
                if there are multiple hits. return None if there is no hit. this option is
                not effective when aggregation results are also returned in the same query.
            native: bool, if the returned result is in python primitive types.
            version: bool, if _version field is kept.
            score: bool, if _score field is kept.
            with_total: bool, if True, the response will include max_total documents,
                and a message to tell how many query terms return greater than the max_size of hits.
                The default is False.
                An example when with_total is True:
                {
                    'max_total': 100,
                    'msg': '12 query terms return > 1000 hits, using from=1000 to retrieve the remaining hits',
                    'hits': [...]
                }
        """
        options = dotdict(options)
        if isinstance(response, list):
            max_total = 0
            count_query_exceed_max_size = 0
            # If options.set isn't set, the default number of hits returned by the ES is 10.
            # Ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-multi-search.html#search-multi-search-api-request-body  # noqa: F501
            max_size = options.size or 10
            responses_ = []
            options.pop("one", None)  # ignore
            template = options.pop("template", {})
            templates = options.pop("templates", [template] * len(response))
            template_hit = options.pop("template_hit", dict(found=True))
            template_miss = options.pop("template_miss", dict(found=False))
            responses = [self.transform(res, **options) for res in response]
            for tpl, res in zip(templates, responses):
                if options.with_total:
                    total = res.get("total", {}).get("value") or 0
                    if total > max_total:
                        max_total = total
                    if total > max_size:
                        count_query_exceed_max_size += 1
                for _res in res if isinstance(res, list) else [res]:
                    assert isinstance(_res, dict)
                    if _res and "hits" not in _res:
                        hit_ = dict(tpl)
                        hit_.update(template_hit)
                        hit_.update(_res)
                        responses_.append(hit_)
                        continue
                    if not _res or not _res["hits"]:
                        tpl.update(template_miss)
                        responses_.append(tpl)
                        continue
                    for hit in _res["hits"]:
                        hit_ = dict(tpl)
                        hit_.update(template_hit)
                        hit_.update(hit)
                        responses_.append(hit_)
            response_ = list(filter(None, responses_))
            if options.with_total:
                response_ = {
                    "max_total": max_total,
                    "hits": response_,
                }
                if count_query_exceed_max_size > 0:
                    _from = (options.get("from") or 0) + max_size
                    response_["msg"] = (
                        f"{count_query_exceed_max_size} query terms return > {max_size} hits, "
                        f"using from={_from} to retrieve the remaining hits"
                    )
            return response_
        if isinstance(response, dict):
            response = self._Hits(response)
            response.collapse("hits")
            response.exclude(("_shards", "_node", "timed_out"))
            response.wrap("hits", self._Doc)
            for hit in response["hits"]:
                hit.collapse("_source")
                # 'sort' is introduced when sorting
                hit.exclude(("_index", "_type", "sort"))
                self._transform_hit(hit, options)
            if options.get("native", True):
                response["hits"] = [hit.data for hit in response["hits"]]
                response = response.data
            if "aggregations" in response:
                self.transform_aggs(response["aggregations"])
                response["facets"] = response.pop("aggregations")
                hits = response.pop("hits")  # move key order
                if hits:  # hide "hits" field when size=0
                    response["hits"] = hits
            elif options.get("one"):
                # prefer one-level presentation
                # or structures as simple as possible
                if len(response["hits"]) == 1:
                    response = response["hits"][0]
                elif len(response["hits"]) == 0:
                    response = None
                else:  # show a list of docs
                    response = response["hits"]
            return response
        raise TypeError()
    def _transform_hit(self, doc, options):
        """
        In-place apply a variety of transformations to a document like:
        {
            "_id": ... ,
            "_index": ... ,
            ...
        }
        """
        if not options.get("version", False):
            doc.pop("_version", None)
        if not options.get("score", True):
            doc.pop("_score", None)
        for path, obj in self.traverse(doc):
            self.transform_hit(path, obj, options)
            if options.allow_null:
                self._allow_null(path, obj, options.allow_null)
            if options.always_list:
                self._always_list(path, obj, options.always_list)
            if options._sorted:
                self._sorted(path, obj)
        if options.dotfield:
            self._dotfield(doc, options)
    @staticmethod
    def _allow_null(path, obj, fields):
        """
        The specified fields should be set to None if it does not exist.
        When flattened, the field could be converted to an empty list.
        """
        if isinstance(obj, (dict, Doc)):
            for field in fields or []:
                if field.startswith(path):
                    key = field[len(path) :].lstrip(".")  # noqa: E203
                    if "." not in key and key not in obj:
                        obj[key] = None
    @staticmethod
    def _always_list(path, obj, fields):
        """
        The specified fields, if exist, should be set to a list type.
        None converts to an emtpy list [] instead of [None].
        """
        if isinstance(obj, (dict, Doc)):
            for field in fields:
                if field.startswith(path):
                    key = field[len(path) :].lstrip(".")  # noqa: E203
                    if key in obj and not isinstance(obj[key], list):
                        obj[key] = [obj[key]] if obj[key] is not None else []
    @staticmethod
    def _sorted(_, obj):
        """
        Sort a container in-place.
        """
        try:
            if isinstance(obj, (dict, Doc)):
                sorted_items = sorted(obj.items())
                obj.clear()
                obj.update(sorted_items)
        except Exception:
            pass
    @classmethod
    def _dotfield(cls, dic, options):
        """
        Flatten a dictionary.
        See biothings.utils.common.traverse for examples.
        """
        hit_ = defaultdict(list)
        for path, value in cls.traverse(dic, leaf_node=True):
            hit_[path].append(value)
        for key, lst in hit_.items():
            if len(lst) == 1 and key not in (options.always_list or []):
                hit_[key] = lst[0]
            else:  # multi-element list
                hit_[key] = list(filter(None, lst))
                if options._sorted:
                    cls._sorted(key, hit_[key])
        dic.clear()
        dic.update(hit_)
[docs]
    def transform_hit(self, path, doc, options):
        """
        Transform an individual search hit result.
        By default add licenses for the configured fields.
        If a source has a license url in its metadata,
        Add "_license" key to the corresponding fields.
        Support dot field representation field alias.
        If we have the following settings in web_config.py
        LICENSE_TRANSFORM = {
            "exac_nontcga": "exac",
            "snpeff.ann": "snpeff"
        },
        Then GET /v1/variant/chr6:g.38906659G>A should look like:
        {
            "exac": {
                "_license": "http://bit.ly/2H9c4hg",
                "af": 0.00002471},
            "exac_nontcga": {
                "_license": "http://bit.ly/2H9c4hg",         <---
                "af": 0.00001883}, ...
        }
        And GET /v1/variant/chr14:g.35731936G>C could look like:
        {
            "snpeff": {
                "_license": "http://bit.ly/2suyRKt",
                "ann": [{"_license": "http://bit.ly/2suyRKt", <---
                        "effect": "intron_variant",
                        "feature_id": "NM_014672.3", ...},
                        {"_license": "http://bit.ly/2suyRKt", <---
                        "effect": "intron_variant",
                        "feature_id": "NM_001256678.1", ...}, ...]
            }, ...
        }
        The arrow marked fields would not exist without the setting lines.
        """
        licenses = self.licenses.get(options.biothing_type, {})
        if path in self.license_transform:
            path = self.license_transform[path]
        if path in licenses and isinstance(doc, dict):
            doc["_license"] = licenses[path]
        if options.jmespath:
            self.trasform_jmespath(path, doc, options)
[docs]
    def trasform_jmespath(self, path: str, doc, options) -> None:
        """Transform any target field in doc using jmespath query syntax.
        The jmespath query parameter value should have the pattern of "<target_list_fieldname>|<jmespath_query_expression>"
        <target_list_fieldname> can be any sub-field of the input doc using dot notation, e.g. "aaa.bbb".
          If empty or ".", it will be the root field.
        The flexible jmespath syntax allows to filter/transform any nested objects in the input doc on the fly.
        The output of the jmespath transformation will then be used to replace the original target field value.
        Examples:
             * filtering an array sub-field
                 jmespath=tags|[?name==`Metadata`]  # filter tags array by name field
                 jmespath=aaa.bbb|[?(sub_a==`val_a`||sub_a==`val_aa`)%26%26sub_b==`val_b`]    # use %26%26 for &&
        """
        # options.jmespath is already validated and processed as a tuple
        # see biothings.web.options.manager.Coverter.translate
        parent_path, target_field, jmes_query = options.jmespath
        if path == parent_path:
            # we handle jmespath transformation at its parent field level,
            # so that we can set a transformed value
            target_field_value = doc.get(target_field) if target_field else doc
            if target_field_value:
                # pass jmp_options to include our own custom jmespath functions
                transformed_field_value = jmes_query.search(target_field_value, options=jmp_options)
                if target_field:
                    doc[target_field] = transformed_field_value
                else:
                    # if the target field is the root field, we need to replace the whole doc
                    # note that we cannot use `doc = transformed_field_value` here, because
                    # it will break the reference to the original doc object
                    doc.clear()
                    doc.update(transformed_field_value)
[docs]
    def transform_aggs(self, res):
        """
        Transform the aggregations field and make it more presentable.
        For example, these are the fields of a two level nested aggregations:
            aggregations.<term>.doc_count_error_upper_bound
            aggregations.<term>.sum_other_doc_count
            aggregations.<term>.buckets.key
            aggregations.<term>.buckets.key_as_string
            aggregations.<term>.buckets.doc_count
            aggregations.<term>.buckets.<nested_term>.* (recursive)
        After the transformation, we'll have:
            facets.<term>._type
            facets.<term>.total
            facets.<term>.missing
            facets.<term>.other
            facets.<term>.terms.count
            facets.<term>.terms.term
            facets.<term>.terms.<nested_term>.* (recursive)
        Note the first level key change doesn't happen here.
        """
        for facet in res:
            res[facet]["_type"] = "terms"  # a type of ES Bucket Aggs
            res[facet]["terms"] = res[facet].pop("buckets")
            res[facet]["other"] = res[facet].pop("sum_other_doc_count", 0)
            res[facet]["missing"] = res[facet].pop("doc_count_error_upper_bound", 0)
            count = 0
            for bucket in res[facet]["terms"]:
                bucket["count"] = bucket.pop("doc_count")
                bucket["term"] = bucket.pop("key")
                if "key_as_string" in bucket:
                    bucket["term"] = bucket.pop("key_as_string")
                count += bucket["count"]
                # nested aggs
                for agg_k in list(bucket.keys()):
                    if isinstance(bucket[agg_k], dict):
                        bucket.update(self.transform_aggs(dict({agg_k: bucket[agg_k]})))
            res[facet]["total"] = count
        return res
[docs]
    def transform_mapping(self, mapping, prefix=None, search=None):
        """
        Transform Elasticsearch mapping definition to
        user-friendly field definitions metadata results.
        """
        assert isinstance(mapping, dict)
        assert isinstance(prefix, str) or prefix is None
        assert isinstance(search, str) or search is None
        result = {}
        items = list(mapping.items())
        items.reverse()
        while items:
            key, dic = items.pop()
            dic = dict(dic)
            dic.pop("dynamic", None)
            dic.pop("normalizer", None)
            if key in self.field_notes:
                result["notes"] = self.field_notes[key]
            if "copy_to" in dic:
                if "all" in dic["copy_to"]:
                    dic["searched_by_default"] = True
                del dic["copy_to"]
            if "index" not in dic:
                if "enabled" in dic:
                    dic["index"] = dic.pop("enabled")
                else:  # true by default
                    dic["index"] = True
            if "properties" in dic:
                dic["type"] = "object"
                subs = ((".".join((key, k)), v) for k, v in dic["properties"].items())
                items.extend(reversed(list(subs)))
                del dic["properties"]
            if all(
                (
                    not self.excluded_keys or key not in self.excluded_keys,
                    not prefix or key.startswith(prefix),
                    not search or search in key,
                )
            ):
                result[key] = dict(sorted(dic.items()))
        return result