Source code for biothings.hub.datatransform.datatransform_api
"""
DataTransforAPI - classes around API based key lookup.
"""
# pylint: disable=E0401, E0611
import copy
import biothings_client
from biothings.hub.datatransform.datatransform import DataTransform, DataTransformEdge, IDStruct, nested_lookup
from biothings.utils.loggers import get_logger
[docs]
class DataTransformAPI(DataTransform):
    """
    Perform key lookup or key conversion from one key type to another using
    an API endpoint as a data source.
    This class uses biothings apis to conversion from one key type to another.
    Base classes are used with the decorator syntax shown below::
        @IDLookupMyChemInfo(input_types, output_types)
        def load_document(doc_lst):
            for d in doc_lst:
                yield d
    Lookup fields are configured in the 'lookup_fields' object, examples of which
    can be found in 'IDLookupMyGeneInfo' and 'IDLookupMyChemInfo'.
    Required Options:
        - input_types
            - 'type'
            - ('type', 'nested_source_field')
            - [('type1', 'nested.source_field1'), ('type2', 'nested.source_field2'), ...]
        - output_types:
            - 'type'
            - ['type1', 'type2']
    Additional Options: see DataTransform class
    """
    batch_size = 10
    default_source = "_id"
    lookup_fields = {}
    def __init__(self, input_types, output_types, *args, **kwargs):
        """
        Initialize the IDLookupAPI object.
        """
        self._generate_return_fields()
        super(DataTransformAPI, self).__init__(input_types, output_types, *args, **kwargs)
        # default value of None for client
        self.client = None
        # Keep track of one_to_many relationships
        self.one_to_many_cnt = 0
        self.logger, _ = get_logger("keylookup_api")
    def _valid_input_type(self, input_type):
        """
        Check if the input_type is valid
        :param input_type:
        :return:
        """
        if not isinstance(input_type, str):
            return False
        return input_type.lower() in self.lookup_fields.keys()
    def _valid_output_type(self, output_type):
        """
        Check if the output_type is valid
        :param output_type:
        :return:
        """
        if not isinstance(output_type, str):
            return False
        return output_type.lower() in self.lookup_fields.keys()
    def _generate_return_fields(self):
        """
        Generate the return_fields member variable from the lookup_fields dictionary.
        :return:
        """
        self.return_fields = ""
        for k in self.lookup_fields:
            for field in self._get_lookup_field(k):
                self.return_fields += field + ","
        self.logger.debug("IDLookupAPI return_fields:  {}".format(self.return_fields))
[docs]
    def key_lookup_batch(self, batchiter):
        """
        Look up all keys for ids given in the batch iterator (1 block)
        :param batchiter:  1 lock of records to look up keys for
        :return:
        """
        id_lst, doc_cache = self._build_cache(batchiter)
        self.logger.info("key_lookup_batch num. id_lst items:  {}".format(len(id_lst)))
        query_res = self._query_many(id_lst)
        qm_struct = self._parse_querymany(query_res)
        return self._replace_keys(qm_struct, doc_cache)
    def _build_cache(self, batchiter):
        """
        Build an id list and document cache for documents read from the
        batch iterator.
        :param batchiter:  an iterator for a batch of documents.
        :return:
        """
        id_lst = []
        doc_cache = []
        for doc in batchiter:
            # handle skip logic
            if self.skip_w_regex and self.skip_w_regex.match(doc["_id"]):
                pass
            else:
                for input_type in self.input_types:
                    val = DataTransformAPI._nested_lookup(doc, input_type[1])
                    if val:
                        id_lst.append('"{}"'.format(val))
            # always place the document in the cache
            doc_cache.append(doc)
        return list(set(id_lst)), doc_cache
    def _query_many(self, id_lst):
        """
        Call the biothings_client querymany function with a list of identifiers
        and output fields that will be returned.
        :param id_lst: list of identifiers to query
        :return:
        """
        # Query MyGene.info
        # self.logger.debug("query_many scopes:  {}".format(self.lookup_fields[self.input_type]))
        scopes = []
        for input_type in self.input_types:
            for field in self._get_lookup_field(input_type[0]):
                scopes.append(field)
        client = self._get_client()
        return client.querymany(
            id_lst,
            scopes=scopes,
            fields=self.return_fields,
            as_generator=True,
            returnall=True,
            size=self.batch_size,
        )
    def _parse_querymany(self, query_res):
        """
        Parse the querymany results from the biothings_client into a structure
        that will later be used for document key replacement.
        :param query_res: querymany results
        :return:
        """
        # self.logger.debug("QueryMany Structure:  {}".format(query_res))
        qm_struct = {}
        for q_out in query_res["out"]:
            query = q_out["query"]
            val = self._parse_h(q_out)
            if val:
                if query not in qm_struct.keys():
                    qm_struct[query] = [val]
                else:
                    self.one_to_many_cnt += 1
                    qm_struct[query] = qm_struct[query] + [val]
        # self.logger.debug("parse_querymany num qm_struct keys: {}"\
        #        .format(len(qm_struct.keys())))
        # self.logger.info("parse_querymany running one_to_many_cnt: {}"\
        #        .format(self.one_to_many_cnt))
        # self.logger.debug("parse_querymany qm_struct: {}"\
        #        .format(qm_struct.keys()))
        return qm_struct
    def _parse_h(self, hit):
        """
        Parse a single hit from the API.
        :param hit:
        :return: dictionary of keys
        """
        for output_type in self.output_types:
            for doc_field in self._get_lookup_field(output_type):
                val = DataTransformAPI._nested_lookup(hit, doc_field)
                if val:
                    return val
        return None
    def _replace_keys(self, qm_struct, doc_cache):
        """
        Build a new list of documents to return that have their keys
        replaced by answers specified in the qm_structure which
        was built earlier.
        :param qm_struct: structure of keys from _parse_querymany
        :param doc_cache: cache of documents that will have keys replaced.
        :return:
        """
        # Replace the keys and build up a new result list
        res_lst = []
        for doc in doc_cache:
            new_doc = None
            for input_type in self.input_types:
                # doc[input_type[1]] must be typed to a string because
                # qm_struct.keys are always strings
                val = DataTransformAPI._nested_lookup(doc, input_type[1])
                if val in qm_struct.keys():
                    for key in qm_struct[val]:
                        new_doc = copy.deepcopy(doc)
                        new_doc["_id"] = key
                        res_lst.append(new_doc)
                # Break out if an input type was used.
                if new_doc:
                    break
            if not new_doc and (
                (self.skip_w_regex and self.skip_w_regex.match(doc["_id"])) or not self.skip_on_failure
            ):
                res_lst.append(doc)
        self.logger.info("_replace_keys:  Num of documents yielded:  {}".format(len(res_lst)))
        # Yield the results
        for res in res_lst:
            yield res
    def _get_lookup_field(self, field):
        """
        Getter for lookup fields which may be either a string or a list of string fields.
        :param field: the name of the field to lookup
        :return:
        """
        if field not in self.lookup_fields.keys():
            raise KeyError(f"provided field {field} is not in self.lookup_fields")
        if isinstance(self.lookup_fields[field], str):
            return [self.lookup_fields[field]]
        return self.lookup_fields[field]
    def _get_client(self):
        """get biothings_client"""
        raise NotImplementedError("_get_client not implemented in the super class")
[docs]
class DataTransformMyChemInfo(DataTransformAPI):
    """Single key lookup for MyChemInfo"""
    lookup_fields = {
        "unii": "unii.unii",
        "rxnorm": ["unii.rxcui"],
        "drugbank": "drugbank.drugbank_id",
        "chebi": "chebi.chebi_id",
        "chembl": "chembl.molecule_chembl_id",
        "pubchem": "pubchem.cid",
        "drugname": [
            "drugbank.name",
            "unii.preferred_term",
            "chebi.chebi_name",
            "chembl.pref_name",
        ],
        "inchi": [
            "drugbank.inchi",
            "chembl.inchi",
            "pubchem.inchi",
        ],
        "inchikey": [
            "drugbank.inchi_key",
            "chembl.inchi_key",
            "pubchem.inchi_key",
        ],
    }
    # The order of output_types decides the priority
    # of the key types we used to get _id value
    output_types = ["inchikey", "unii", "rxnorm", "drugbank", "chebi", "chembl", "pubchem", "drugname"]
    def __init__(self, input_types, output_types=None, skip_on_failure=False, skip_w_regex=None):
        """
        Initialize the class by seting up the client object.
        """
        _output_types = output_types or self.output_types
        super(DataTransformMyChemInfo, self).__init__(input_types, _output_types, skip_on_failure, skip_w_regex)
    def _get_client(self):
        """
        Get Client - return a client appropriate for IDLookup
        This method must be defined in the child class.  It is an artifact
        of multithreading.
        :return:
        """
        if not self.client:
            self.client = biothings_client.get_client("drug")
        return self.client
[docs]
class BiothingsAPIEdge(DataTransformEdge):
    """
    APIEdge - IDLookupEdge object for API calls
    """
    # define in subclass
    client_name = None
    def __init__(self, lookup, fields, weight=1, label=None, url=None):
        # pylint: disable=R0913
        super(BiothingsAPIEdge, self).__init__(label)
        self.init_state()
        if isinstance(lookup, str):
            self.scopes = [lookup]
        elif isinstance(lookup, list):
            self.scopes = lookup
        else:
            raise TypeError("scopes argument must be str or list")
        if isinstance(fields, str):
            self.fields = [fields]
        elif isinstance(fields, list):
            self.fields = fields
        else:
            raise TypeError("fields argument must be str or list")
        self.weight = weight
        self.url = url
[docs]
    def init_state(self):
        """initialize state - pickleable member variables"""
        self._state = {"client": None, "logger": None}
    @property
    def client(self):
        """property getter for client"""
        if not self._state["client"]:
            try:
                self.prepare_client()
            except NotImplementedError:
                # if accessed but not ready, then just ignore and return invalid value for a client
                return None
        return self._state["client"]
[docs]
    def prepare_client(self):
        """
        Load the biothings_client for the class
        :return:
        """
        if not self.client_name:
            raise NotImplementedError("Define client_name in subclass")
        if self.url:
            self._state["client"] = biothings_client.get_client(self.client_name, url=self.url)
        else:
            self._state["client"] = biothings_client.get_client(self.client_name)
        self.logger.info("Registering biothings_client {}".format(self.client_name))
[docs]
    def edge_lookup(self, keylookup_obj, id_strct, debug=False):
        """
        Follow an edge given a key.
        This method uses the data in the edge_object
        to find one key to another key using an api.
        :param edge:
        :param key:
        :return:
        """
        # If no keys were passed return an empty idstruct_class
        # pylint: disable=C1801
        if not len(id_strct):
            return keylookup_obj.idstruct_class()
        # query the api
        query_res = self._query_many(keylookup_obj, id_strct)
        new_id_strct = self._parse_querymany(keylookup_obj, query_res, id_strct, self.fields, debug)
        return new_id_strct
    def _query_many(self, keylookup_obj, id_strct):
        """
        Call the biothings_client querymany function with a list of identifiers
        and output fields that will be returned.
        :param id_lst: list of identifiers to query
        :return:
        """
        if not isinstance(id_strct, IDStruct):
            raise TypeError("id_strct shouldb be of type IDStruct")
        id_lst = []
        for key in id_strct.id_lst:
            id_lst.append('"{}"'.format(key))
        return self.client.querymany(
            id_lst,
            scopes=self.scopes,
            fields=self.fields,
            as_generator=True,
            returnall=True,
            size=keylookup_obj.batch_size,
            verbose=False,
        )
    def _parse_querymany(self, keylookup_obj, query_res, id_strct, fields, debug):
        # pylint: disable=R0913, W0613
        """
        Parse the querymany results from the biothings_client into a structure
        that will later be used for document key replacement.
        :param query_res: querymany results
        :return:
        """
        # self.logger.debug("QueryMany Structure:  {}".format(query_res))
        qm_struct = IDStruct()
        # Keep the old debug information
        if debug:
            qm_struct.import_debug(id_strct)
        # pylint: disable=R1702
        for q_out in query_res["out"]:
            query = q_out["query"]
            for field in fields:
                val = nested_lookup(q_out, field)
                if val:
                    for orig_id, curr_id in id_strct:
                        # query is always a string, so this check requires conversion
                        if query == str(curr_id):
                            qm_struct.add(orig_id, val)
                            # save debug information in the option is set
                            if debug:
                                qm_struct.set_debug(orig_id, self.label, val)
        return qm_struct
[docs]
class MyChemInfoEdge(BiothingsAPIEdge):
    """
    The MyChemInfoEdge uses the MyChem.info API to convert identifiers.
    """
    client_name = "drug"
    def __init__(self, lookup, field, weight=1, label=None, url=None):
        # pylint: disable=R0913, W0235
        """
        :param lookup: The field in the API to search with the input identifier.
        :type lookup: str
        :param field: The field in the API to convert to.
        :type field: str
        :param weight: Weights are used to prefer one path over another.
                       The path with the lowest weight is preferred.
                       The default weight is 1.
        :type weight: int
        """
        super(MyChemInfoEdge, self).__init__(lookup, field, weight, label, url)
[docs]
class MyGeneInfoEdge(BiothingsAPIEdge):
    """
    The MyGeneInfoEdge uses the MyGene.info API to convert identifiers.
    """
    client_name = "gene"
    def __init__(self, lookup, field, weight=1, label=None, url=None):
        # pylint: disable=R0913, W0235
        """
        :param lookup: The field in the API to search with the input identifier.
        :type lookup: str
        :param field: The field in the API to convert to.
        :type field: str
        :param weight: Weights are used to prefer one path over another.
                       The path with the lowest weight is preferred.
                       The default weight is 1.
        :type weight: int
        """
        super(MyGeneInfoEdge, self).__init__(lookup, field, weight, label, url)
####################
[docs]
class DataTransformMyGeneInfo(DataTransformAPI):
    """deprecated"""
    lookup_fields = {
        "ensembl": "ensembl.gene",
        "entrezgene": "entrezgene",
        "symbol": "symbol",
        "uniprot": "uniprot.Swiss-Prot",
    }
    def __init__(
        self,
        input_types,
        output_types=None,
        skip_on_failure=False,
        skip_w_regex=None,
    ):
        # pylint: disable=W0102
        """
        Initialize the class by seting up the client object.
        """
        output_types = output_types or ["entrezgene"]
        super(DataTransformMyGeneInfo, self).__init__(input_types, output_types, skip_on_failure, skip_w_regex)
    def _get_client(self):
        """
        Get Client - return a client appropriate for IDLookup
        This method must be defined in the child class.  It is an artifact
        of multithreading.
        :return:
        """
        if not self.client:
            self.client = biothings_client.get_client("gene")
        return self.client