Source code for biothings.hub.datatransform.datatransform_mdb
"""
DataTransform MDB module - class for performing key lookup
using conversions described in a networkx graph.
"""
# pylint: disable=E0401, E0611
import copy
import networkx as nx
from pymongo.collation import Collation
import biothings.utils.mongo as mongo
from biothings.hub.datatransform import DataTransform, DataTransformEdge, IDStruct, nested_lookup
from biothings.utils.loggers import get_logger
[docs]
class MongoDBEdge(DataTransformEdge):
    """
    The MongoDBEdge uses data within a MongoDB collection to convert
    one identifier to another. The input identifier is used to search
    a collection. The output identifier values are read out of that
    collection:
    """
    def __init__(self, collection_name, lookup, field, weight=1, label=None, check_index=True):
        # pylint: disable=R0913
        """
        :param collection_name: The name of the MongoDB collection.
        :type collection_name: str
        :param lookup: The field that will match the input identifier in the collection.
        :type lookup: str
        :param field: The output identifier field that will be read out of matching documents.
        :type field: str
        :param weight: Weights are used to prefer one path over another.
                       The path with the lowest weight is preferred.
                       The default weight is 1.
        :type weight: int
        """
        super(MongoDBEdge, self).__init__(label)
        # unpickleable attributes, grouped
        self.init_state()
        self.collection_name = collection_name
        self.lookup = lookup
        self.field = field
        self.weight = weight
        if check_index:
            if self.collection_name in self.collection.database.collection_names():
                avail_idxs = {}
                for idx in self.collection.list_indexes():
                    keys = idx["key"]
                    # this could be a composite index, multiple keys being part of the index
                    # we'll consider them as individually accessible, but I'm not sure how
                    # MongoDB deals with that => TODO check
                    for k in keys:
                        avail_idxs[k] = True
                if self.lookup not in avail_idxs:
                    raise ValueError("Field '%s' isn't indexed, this would " % self.lookup
                                     + "result in very long datatransform process")
            else:
                self.logger.warning("Collection '%s' doesn't exist, can't check indices" % self.collection_name)
    @property
    def collection(self):
        """getting for collection member variable"""
        if not self._state["collection"]:
            try:
                self.prepare_collection()
            # pylint: disable=W0703
            except Exception:
                # if accessed but not ready, then just ignore and return invalid
                # value for a client
                return None
        return self._state["collection"]
[docs]
    def prepare_collection(self):
        """
        Load the mongodb collection specified by collection_name.
        :return:
        """
        self._state["collection"] = mongo.get_src_db()[self.collection_name]
        self.logger.info("Registering collection:  {}".format(self.collection_name))
[docs]
    def edge_lookup(self, keylookup_obj, id_strct, debug=False):
        """
        Follow an edge given a key.
        An edge represets a document and this method uses the data in the edge_object
        to find one key to another key using exactly one mongodb lookup.
        :param keylookup_obj:
        :param id_strct:
        :return:
        """
        if not isinstance(id_strct, IDStruct):
            raise TypeError("edge_lookup id_struct is of the wrong type")
        # Build up a new_id_strct from the results
        res_id_strct = IDStruct()
        # Keep the old debug information
        if debug:
            res_id_strct.import_debug(id_strct)
        id_lst = id_strct.id_lst
        if id_lst:
            find_lst = self.collection_find(id_lst, self.lookup, self.field)
            for doc in find_lst:
                for orig_id in id_strct.find_right(nested_lookup(doc, self.lookup)):
                    res_id_strct.add(orig_id, nested_lookup(doc, self.field))
                    if debug:
                        res_id_strct.set_debug(orig_id, self.label, nested_lookup(doc, self.field))
        return res_id_strct
[docs]
    def collection_find(self, id_lst, lookup, field):
        """
        Abstract out (as one line) the call to collection.find
        """
        return self.collection.find({lookup: {"$in": id_lst}}, {lookup: 1, field: 1})
[docs]
class CIMongoDBEdge(MongoDBEdge):
    """
    Case-insensitive MongoDBEdge
    """
    def __init__(self, collection_name, lookup, field, weight=1, label=None):
        # pylint: disable=R0913, W0235
        super(CIMongoDBEdge, self).__init__(collection_name, lookup, field, weight, label)
[docs]
    def collection_find(self, id_lst, lookup, field):
        """
        Abstract out (as one line) the call to collection.find
        and use a case-insensitive collation
        """
        return self.collection.find({lookup: {"$in": id_lst}}, {lookup: 1, field: 1})\
            .collation(Collation(locale='en', strength=2))
[docs]
class DataTransformMDB(DataTransform):
    """
    Convert document identifiers from one type to another.
    """
    # Constants
    batch_size = 1000
    default_source = '_id'
    def __init__(self, graph, *args, **kwargs):
        """
        The DataTransformNetworkX module was written as a decorator class
        which should be applied to the load_data function of a
        Biothings Uploader.  The load_data function yields documents,
        which are then post processed by call and the 'id' key
        conversion is performed.
        :param graph: nx.DiGraph (networkx 2.1) configuration graph
        :param input_types: A list of input types for the form (identifier, field) where
                            identifier matches a node and field is an optional dotstring
                            field for where the identifier should be read from
                            (the default is '_id').
        :param output_types: A priority list of identifiers to convert to. These
                             identifiers should match nodes in the graph.
        :type output_types: list(str)
        :param id_priority_list: A priority list of identifiers to to sort input
                                 and output types by.
        :type id_priority_list: list(str)
        :param skip_on_failure: If True, documents where identifier conversion fails
                                will be skipped in the final document list.
        :type skip_on_failure: bool
        :param skip_w_regex: Do not perform conversion if the identifier matches
                             the regular expression provided to this argument. By default,
                             this option is disabled.
        :type skip_w_regex: bool
        :param skip_on_success: If True, documents where identifier conversion succeeds
                                will be skipped in the final document list.
        :type skip_on_success: bool
        :param idstruct_class: Override an internal data structure used by the this
                               module (advanced usage)
        :type idstruct_class: class
        :param copy_from_doc: If true then an identifier is copied from the input
                              source document regardless as to weather it matches an
                              edge or not. (advanced usage)
        :type copy_from_doc: bool
        """
        if not isinstance(graph, nx.DiGraph):
            raise ValueError("key_lookup configuration error:  graph must be of type nx.DiGraph")
        self._validate_graph(graph)
        self.graph = graph
        self.logger, _ = get_logger('datatransform')
        super(DataTransformMDB, self).__init__(*args, **kwargs)
        self._precompute_paths()
    def _valid_input_type(self, input_type):
        return input_type.lower() in self.graph.nodes()
    def _valid_output_type(self, output_type):
        return output_type.lower() in self.graph.nodes()
    @staticmethod
    def _validate_graph(graph):
        """
        Check if the input configuration graph graph has a valid structure.
        :param graph: key_lookup configuration graph
        :return:
        """
        # all node names should be lowercase
        for node in graph.nodes():
            if node != node.lower():
                raise ValueError("node object {} is not lowercase".format(node))
        for (vert1, vert2) in graph.edges():
            if 'object' not in graph.edges[vert1, vert2].keys():
                raise ValueError("edge_object for ({}, {}) is missing".format(vert1, vert2))
            edge_object = graph.edges[vert1, vert2]['object']
            if not isinstance(edge_object, DataTransformEdge):
                raise ValueError("edge_object for ({}, {}) is of the wrong type".
                                 format(vert1, vert2))
    def _precompute_paths(self):
        """
        Precompute all paths from the given key_type to all target key types
        provided on initialization.
        :return:
        """
        self.paths = {}
        for output_type in self.output_types:
            for input_type in self.input_types:
                paths = [p for p in nx.all_simple_paths(self.graph, input_type[0], output_type)]
                if not paths:
                    try:
                        # this will try to find self-loops. all_shortest_paths() return one element,
                        # the self-lopped node, but we need an tuple so the "*2"
                        # also make sure those self-loops actually are defined in the graph
                        try:
                            # this will raise a keyerror is edge for self-loop
                            # p-to-p isn't defined
                            # pylint: disable=W0104
                            self.graph.edges[input_type[0], output_type]
                            paths = [p*2 for p in nx.all_shortest_paths(
                                self.graph, input_type[0], output_type)]
                        except KeyError:
                            pass
                    except nx.NetworkXNoPath:
                        pass
                # Sort by path length - try the shortest paths first
                paths = sorted(paths, key=self._compute_path_weight)
                self.paths[(input_type[0], output_type)] = paths
        # self.logger.debug("All Pre-Computed DataTransform Paths:  {}".format(self.paths))
[docs]
    def key_lookup_batch(self, batchiter):
        # pylint: disable=R0912
        """
        Look up all keys for ids given in the batch iterator (1 block)
        :param batchiter:  1 lock of records to look up keys for
        :return:
        """
        doc_lst = []
        for doc in batchiter:
            # in debug mode, skip all documents not in the debug list
            if self.debug:
                # pylint: disable=C0121
                if self.debug is True or doc['_id'] in self.debug:
                    # set debug information
                    doc['dt_debug'] = {'orig_id': doc['_id']}
                    doc_lst.append(doc)
            else:
                doc_lst.append(doc)
        hit_lst = []
        miss_lst = []
        for doc in doc_lst:
            if self.skip_w_regex and self.skip_w_regex.match(doc['_id']):
                yield doc
            else:
                miss_lst.append(doc)
        # Attempt to reach each destination in order...
        for output_type in self.output_types:
            # Starting with each input_type
            for input_type in self.input_types:
                # self.logger.debug("Attempt Lookup:  from '{}' To '{}'"\
                # .format(input_type[0], output_type))
                if output_type == input_type[0]:
                    # the doc itself has the correct ID,
                    # so either there's a self-loop avail to check this ID is valid
                    if self.graph.has_edge(output_type, output_type):
                        (hit_lst, miss_lst) = self.travel(input_type, output_type, miss_lst)
                    # or if copy is allowed, we get the value from the doc
                    elif self.copy_from_doc:
                        (hit_lst, miss_lst) = self._copy(input_type, miss_lst)
                else:
                    (hit_lst, miss_lst) = self.travel(input_type, output_type, miss_lst)
                if not self.skip_on_success:
                    for doc in hit_lst:
                        yield doc
        # Keep the misses if we do not skip on failure
        if not self.skip_on_failure:
            for doc in miss_lst:
                yield doc
    def _copy(self, input_type, doc_lst):
        """Copy ids in the case where input_type == output_type"""
        hit_lst = []
        miss_lst = []
        for doc in doc_lst:
            val = nested_lookup(doc, input_type[1])
            if val:
                # ensure _id is always a str
                doc['_id'] = str(val)
                hit_lst.append(doc)
                # retain debug information if available (assumed dt_debug already in place)
                if self.debug:
                    doc['dt_debug']['copy_from'] = (input_type[1], val)
            else:
                miss_lst.append(doc)
        # Keep a record of IDs copied
        self.histogram.update_io(input_type, input_type, len(hit_lst))
        return (hit_lst, miss_lst)
    def _compute_path_weight(self, path):
        """
        Helper function to compute the weight of a path
        :param path:
        :return: computed weight
        """
        weight = 0
        for path_var in map(nx.utils.pairwise, [path]):
            for (vert1, vert2) in path_var:
                edge = self.graph.edges[vert1, vert2]['object']
                weight = weight + edge.weight
        return weight
[docs]
    def travel(self, input_type, target, doc_lst):
        # pylint: disable=R0914
        """
        Traverse a graph from a start key type to a target key type using
        precomputed paths.
        :param start: key type to start from
        :param target: key type to end at
        :param key: key value of type 'start'
        :return:
        """
        def _build_path_strct(input_type, doc_lst):
            """
            Build the path structure for the travel function
            :return:
            """
            return self.idstruct_class(input_type[1], doc_lst)
        def _build_hit_miss_lsts(doc_lst, id_strct, debug):
            """
            Return a list of documents that have had their identifiers replaced
            also return a list of documents that were not changed
            :param doc_lst:
            :param id_strct:
            :return:
            """
            hit_lst = []
            miss_lst = []
            for doc in doc_lst:
                hit_flag = False
                value = nested_lookup(doc, input_type[1])
                for lookup_id in id_strct.find_left(value):
                    new_doc = copy.deepcopy(doc)
                    # ensure _id is always a str
                    new_doc['_id'] = str(lookup_id)
                    # capture debug information
                    if debug:
                        new_doc['dt_debug']['start_field'] = input_type[1]
                        new_doc['dt_debug']['debug'] = id_strct.get_debug(value)
                    hit_lst.append(new_doc)
                    hit_flag = True
                if not hit_flag:
                    miss_lst.append(doc)
            return hit_lst, miss_lst
        #self.logger.debug("Travel From '{}' To '{}'".format(input_type[0], target))
        # Keep a running list of all saved hits
        saved_hits = IDStruct()
        # Build the path structure, which will save results
        path_strct = _build_path_strct(input_type, doc_lst)
        for path in map(nx.utils.misc.pairwise, self.paths[(input_type[0], target)]):
            for (vert1, vert2) in path:
                edge = self.graph.edges[vert1, vert2]['object']
                num_input_ids = len(path_strct)
                path_strct = self._edge_lookup(edge, path_strct)
                num_output_ids = len(path_strct)
                if num_input_ids:
                    # self.logger.debug("Edge {} - {}, {} searched returned {}"\
                    #        .format(vert1, vert2, num_input_ids, num_output_ids))
                    self.histogram.update_edge(vert1, vert2, num_output_ids)
            if path_strct:
                saved_hits += path_strct
            # reset the state to lookup misses
            path_strct = self.idstruct_class()
            for doc in doc_lst:
                val = nested_lookup(doc, input_type[1])
                if val:
                    if not saved_hits.left(val):
                        path_strct.add(val, val)
        # Return a list of documents that have had their identifiers replaced
        # also return a list of documents that were not changed
        hit_lst, miss_lst = _build_hit_miss_lsts(doc_lst, saved_hits, self.debug)
        self.histogram.update_io(input_type, target, len(hit_lst))
        return hit_lst, miss_lst
    def _edge_lookup(self, edge_obj, id_strct):
        """
        Follow an edge given a key.
        This method uses the data in the edge_object
        to find one key to another key using one of
        several types of lookup functions.
        """
        return edge_obj.edge_lookup(self, id_strct, self.debug)