import asyncio
import copy
import json
import math
import os
import pickle
import re
import sys
import time
from datetime import datetime
from functools import partial
from pprint import pformat
import aiocron
import biothings.utils.mongo as mongo
from biothings import config as btconfig
from biothings.hub import BUILDER_CATEGORY, UPLOADER_CATEGORY
from biothings.utils.backend import DocMongoBackend
from biothings.utils.common import (
    dotdict,
    find_classes_subclassing,
    get_random_string,
    iter_n,
    open_compressed_file,
    timesofar,
)
from biothings.utils.dataload import merge_struct
from biothings.utils.hub_db import (
    get_source_fullname,
    get_src_build,
    get_src_build_config,
    get_src_dump,
    get_src_master,
)
from biothings.utils.loggers import get_logger
from biothings.utils.manager import BaseManager
from biothings.utils.mongo import doc_feeder, id_feeder
from ..databuild.backend import LinkTargetDocMongoBackend, SourceDocMongoBackend, TargetDocMongoBackend
from ..dataload.uploader import ResourceNotReady
from .backend import create_backend
from .buildconfig import AutoBuildConfig
from .mapper import TransparentMapper
logging = btconfig.logger
[docs]
def pending(build_name, action_name):
    src_build = get_src_build()
    src_build.update({"_id": build_name}, {"$addToSet": {"pending": action_name}}) 
[docs]
class BuilderException(Exception):
    pass 
[docs]
class ResumeException(Exception):
    pass 
[docs]
class DataBuilder(object):
    """
    Generic data builder.
    """
    keep_archive = 10  # number of archived collection to keep. Oldest get dropped first.
    def __init__(
        self,
        build_name,
        source_backend,
        target_backend,
        log_folder,
        doc_root_key="root",
        mappers=None,
        default_mapper_class=TransparentMapper,
        sources=None,
        target_name=None,
        **kwargs,
    ):
        mappers = mappers or []
        self.init_state()
        self.build_name = build_name
        self.sources = sources
        self.target_name = target_name
        self._partial_source_backend = None
        self._partial_target_backend = None
        if type(source_backend) == partial:
            self._partial_source_backend = source_backend
        else:
            self._state["source_backend"] = source_backend
        if type(target_backend) == partial:
            self._partial_target_backend = target_backend
        else:
            self._state["target_backend"] = target_backend
        # doc_root_key is a key name within src_build_config doc.
        # it's a list of datasources that are able to create a document
        # even it doesn't exist. It root documents list is not empty, then
        # any other datasources not listed there won't be able to create
        # a document, only will they able to update it.
        # If no root documets, any datasources can create/update a doc
        # and thus there's no priority nor limitations
        # note: negations can be used, like "!source1". meaning source1 is not
        # root document datasource.
        # Usefull to express; "all resources except source1"
        self.doc_root_key = doc_root_key
        # overall merge start time
        self.t0 = time.time()
        # step merge start time
        self.ti = time.time()
        self.logfile = None
        self.log_folder = log_folder
        self.mappers = {}
        self.timestamp = datetime.now()
        self.merge_stats = {}  # keep track of cnt per source, etc...
        self.src_meta = {}  # sources involved in this build (includes versions)
        self.stats = {}  # can be customized
        self.mapping = {}  # ES mapping (merged from src_master's docs)
        for mapper in mappers + [default_mapper_class()]:
            self.mappers[mapper.name] = mapper
        self.step = kwargs.get("step", 10000)
        self.prepared = False
[docs]
    def init_state(self):
        self._state = {
            "logger": None,
            "source_backend": None,
            "target_backend": None,
            "build_config": None,
        } 
    @property
    def logger(self):
        if not self._state["logger"]:
            self.prepare()
        return self._state["logger"]
    @property
    def source_backend(self):
        if self._state["source_backend"] is None:
            self.prepare()
            self._state["build_config"] = self._state["source_backend"].get_build_configuration(self.build_name)
            self._state["source_backend"].validate_sources(self.sources)
        return self._state["source_backend"]
    @property
    def target_backend(self):
        if self._state["target_backend"] is None:
            self.prepare()
        return self._state["target_backend"]
    @property
    def build_config(self):
        self._state["build_config"] = self.source_backend.get_build_configuration(self.build_name)
        return self._state["build_config"]
    @logger.setter
    def logger(self, value):
        self._state["logger"] = value
    @build_config.setter
    def build_config(self, value):
        self._state["build_config"] = value
[docs]
    def prepare(self, state=None):
        state = state or {}
        if self.prepared:
            return
        if state:
            # let's be explicit, _state takes what it wants
            for k in self._state:
                self._state[k] = state[k]
            return
        if self._partial_source_backend:
            self._state["source_backend"] = self._partial_source_backend()
        if self._partial_target_backend:
            self._state["target_backend"] = self._partial_target_backend()
        self.setup()
        self.setup_log()
        self.prepared = True 
[docs]
    def unprepare(self):
        """
        reset anything that's not pickable (so self can be pickled)
        return what's been reset as a dict, so self can be restored
        once pickled
        """
        # TODO: use copy ?
        state = {
            "logger": self._state["logger"],
            "source_backend": self._state["source_backend"],
            "target_backend": self._state["target_backend"],
            "build_config": self._state["build_config"],
        }
        for k in state:
            self._state[k] = None
        self.prepared = False
        return state 
[docs]
    def get_predicates(self):
        """
        Return a list of predicates (functions returning true/false, as in math logic)
        which instructs/dictates if job manager should start a job (process/thread)
        """
        def no_uploader_running(job_manager):
            """Uploaders could change the data to be merged..."""
            num_offenders = 0
            # self.sources is not populated. thanks.
            sources = self.build_config.get("sources", [])
            offending_sources = set()
            for src in sources:
                src_full_name = get_source_fullname(src)
                offending_sources.add(src_full_name)
            self.logger.debug("no_uploader_running: src full names %s", offending_sources)
            for job in job_manager.jobs.values():
                if job["category"] == UPLOADER_CATEGORY:
                    if "source" in job:
                        if job["source"] in offending_sources:
                            num_offenders += 1
                            self.logger.info("%s uploader running cannot build for now", job["source"])
                    else:
                        num_offenders += 1
                        self.logger.warning(
                            "uploader with pinfo: %s running, no source info. cannot build for now", job
                        )
                else:
                    pass  # job is not an uploader
            return num_offenders == 0
        # def no_merger_running():
        #    """
        #    Mergers use cache files, if more than one running and caches need to be built
        #    both would try to write on the same cache file
        #    """
        #    return len([j for j in job_manager.jobs.values() if j["category"] == BUILDER_CATEGORY]) == 0
        return [no_uploader_running] 
[docs]
    def get_pinfo(self):
        """
        Return dict containing information about the current process
        (used to report in the hub)
        """
        pinfo = {
            "category": BUILDER_CATEGORY,
            "source": "%s:%s" % (self.build_name, self.target_backend.target_name),
            "step": "",
            "description": "",
        }
        preds = self.get_predicates()
        if preds:
            pinfo["__predicates__"] = preds
        return pinfo 
[docs]
    def setup_log(self):
        log_name = self.target_name or self.build_name
        log_folder = os.path.join(btconfig.LOG_FOLDER, "build", log_name) if btconfig.LOG_FOLDER else None
        self.logger, _ = get_logger("build", log_folder=log_folder, force=True) 
[docs]
    def check_ready(self, force=False):
        if force:
            # don't even bother
            return
        src_build_config = self.source_backend.build_config
        src_dump = self.source_backend.dump
        _cfg = src_build_config.find_one({"_id": self.build_config["name"]})
        # check if all resources are uploaded
        for src_name in _cfg["sources"]:
            fullname = get_source_fullname(src_name)
            if not fullname:
                raise ResourceNotReady("Can't find source '%s'" % src_name)
            main_name = fullname.split(".")[0]
            src_doc = src_dump.find_one({"_id": main_name})
            if not src_doc:
                raise ResourceNotReady("Missing information for source '%s' to start merging" % src_name)
            if not src_doc.get("upload", {}).get("jobs", {}).get(src_name, {}).get("status") == "success":
                raise ResourceNotReady("No successful upload found for resource '%s'" % src_name) 
[docs]
    def get_target_name(self):
        return "{}_{}_{}".format(self.build_name, self.get_build_version(), get_random_string()).lower() 
[docs]
    def get_build_version(self):
        """
        Generate an arbitrary major build version. Default is using a timestamp (YYMMDD)
        '.' char isn't allowed in build version as it's reserved for minor versions
        """
        d = datetime.fromtimestamp(self.t0)
        version_fmt = self.build_config.get("build_version")
        if not version_fmt:
            version_fmt = "%Y%m%d"
        bversion = d.strftime(version_fmt)
        self.logger.info("Build version: %s", bversion)
        return bversion 
[docs]
    def register_status(self, status, transient=False, init=False, **extra):
        """
        Register current build status. A build status is a record in src_build
        The key used in this dict the target_name. Then, any operation
        acting on this target_name is registered in a "jobs" list.
        """
        assert self.build_config, "build_config needs to be specified first"
        # get it from source_backend, kind of weird...
        src_build = self.source_backend.build
        all_sources = self.build_config.get("sources", [])
        target_name = "%s" % self.target_backend.target_name
        backend_url = self.target_backend.get_backend_url()
        build_info = {
            "_id": target_name,
            # TODO: deprecate target_backend & target_name, use backed_url instead
            "target_backend": self.target_backend.name,
            "target_name": target_name,
            "backend_url": backend_url,
            "build_config": self.build_config,
            # these are all the sources required to build target
            # (not just the ones being processed, those are registered in jobs
            "sources": all_sources,
        }
        job_info = {
            "status": status,
            "step_started_at": datetime.now().astimezone(),
            "logfile": self.logfile,
        }
        if transient:
            # record some "in-progress" information
            job_info["pid"] = os.getpid()
        else:
            # only register time when it's a final state
            job_info["time"] = timesofar(self.ti)
            t1 = round(time.time() - self.ti, 0)
            job_info["time_in_s"] = t1
        if "build" in extra:
            build_info.update(extra["build"])
        if "job" in extra:
            job_info.update(extra["job"])
        # create a new build entry in "build" dict if none exists
        build = src_build.find_one({"_id": target_name})
        if not build:
            # first record for target_name, keep a timestamp
            build_info["started_at"] = datetime.fromtimestamp(self.t0).astimezone()
            build_info["jobs"] = []
            src_build.insert_one(build_info)
        if init:
            # init timer for this step
            self.ti = time.time()
            src_build.update({"_id": target_name}, {"$push": {"jobs": job_info}})
            # now refresh/sync
            build = src_build.find_one({"_id": target_name})
        else:
            # merge extra at root level
            # (to keep building data...) and update the last one
            # (it's been properly created before when init=True)
            build["jobs"] and build["jobs"][-1].update(job_info)
            # build_info is common to all jobs, so we want to keep
            # any existing data (well... except if it's explicitely specified)
            def merge_build_info(target, d):
                if "__REPLACE__" in d.keys():
                    d.pop("__REPLACE__")
                    target = d
                else:
                    for k, v in d.items():
                        if type(v) == dict:
                            if k in target:
                                target[k] = merge_build_info(target[k], v)
                            else:
                                v.pop("__REPLACE__", None)
                                # merge v with "nothing" just to make sure to remove any "__REPLACE__"
                                v = merge_build_info({}, v)
                                target[k] = v
                        else:
                            target[k] = v
                return target
            build = merge_build_info(build, build_info)
            src_build.replace_one({"_id": build["_id"]}, build) 
[docs]
    def clean_old_collections(self):
        # use target_name is given, otherwise build name will be used
        # as collection name prefix, so they should start like that
        prefix = "%s_" % (self.target_name or self.build_name)
        db = mongo.get_target_db()
        cols = [c for c in db.collection_names() if c.startswith(prefix)]
        # timestamp is what's after _archive_, YYYYMMDD, so we can sort it safely
        cols = sorted(cols, reverse=True)
        to_drop = cols[self.keep_archive :]
        for colname in to_drop:
            self.logger.info("Cleaning old archive collection '%s'", colname)
            db[colname].drop() 
[docs]
    def init_mapper(self, mapper_name):
        if self.mappers[mapper_name].need_load():
            if mapper_name is None:
                self.logger.info("Initializing default mapper")
            else:
                self.logger.info("Initializing mapper name '%s'", mapper_name)
            self.mappers[mapper_name].load() 
[docs]
    def generate_document_query(self, src_name):
        return None 
[docs]
    def get_root_document_sources(self):
        root_srcs = self.build_config.get(self.doc_root_key, []) or []
        # check for "not this resource" and adjust the list
        none_root_srcs = [src.replace("!", "") for src in root_srcs if src.startswith("!")]
        if none_root_srcs:
            if len(none_root_srcs) != len(root_srcs):
                raise BuilderException(
                    "If using '!' operator, all datasources must use it (cannot mix), got: %s", repr(root_srcs)
                )
            # ok, grab sources for this build,
            srcs = self.build_config.get("sources", [])
            root_srcs = list(set(srcs).difference(set(none_root_srcs)))
            # self.logger.info("'except root' sources %s resolves to root source = %s", repr(none_root_srcs), root_srcs)
        # resolve possible regex based source name (split-collections sources)
        root_srcs = self.resolve_sources(root_srcs)
        return root_srcs 
[docs]
    def setup(self, sources=None, target_name=None):
        sources = sources or self.sources
        # try to get target_name from args, otherwise for now generate it
        # using mongo backend (it'll be set later during merge() call)
        target_name = target_name or self.target_name
        self.target_backend.set_target_name(self.target_name, self.build_name)
        # root key is optional but if set, it must exist in build config
        if self.doc_root_key and self.doc_root_key not in self.build_config:
            raise BuilderException("Root document key '%s' can't be found in build configuration" % self.doc_root_key) 
[docs]
    def get_stats(self, sources, job_manager):
        """
        Return a dictionnary of metadata for this build. It's usually app-specific
        and this method may be overridden as needed. By default though, the total
        number of documents in the merged collection is stored (key "total")
        Return dictionary will be merged with any existing metadata in
        src_build collection. This behavior can be changed by setting a special
        key within metadata dict: {"__REPLACE__" : True} will... replace existing
        metadata with the one returned here.
        "job_manager" is passed in case parallelization is needed. Be aware
        that this method is already running in a dedicated thread, in order to
        use job_manager, the following code must be used at the very beginning
        of its implementation:
        asyncio.set_event_loop(job_manager.loop)
        """
        total = self.target_backend.target_collection.count()
        return {"total": total} 
[docs]
    def get_mapping(self, sources):
        """
        Merge mappings from src_master
        """
        mapping = {}
        src_master = self.source_backend.master
        for collection in self.build_config["sources"]:
            meta = src_master.find_one({"_id": collection})
            if "mapping" in meta and meta["mapping"]:
                mapping = merge_struct(mapping, meta["mapping"])
            else:
                raise BuilderException('"%s" has no mapping data' % collection)
        return mapping 
[docs]
    def resolve_sources(self, sources):
        """
        Source can be a string that may contain regex chars. It's usefull
        when you have plenty of sub-collections prefixed with a source name.
        For instance, given a source named "blah" stored in as many collections
        as chromosomes, insteand of passing each name as "blah_1", "blah_2", etc...
        "blah_.*" can be specified in build_config. This method resolves potential
        regexed source name into real, existing collection names
        """
        if type(sources) == str:
            sources = [sources]
        src_db = mongo.get_src_db()
        cols = src_db.collection_names()
        masters = self.source_backend.get_src_master_docs()
        found = []
        for src in sources:
            # check if master _id and name are different (meaning name is a regex)
            master = masters.get(src)
            if not master:
                raise BuilderException(
                    "'%s'could not be found in master documents (%s)" % (src, repr(list(masters.keys())))
                )
            search = src
            if master["_id"] != master["name"]:
                search = master["name"]
            # restrict pattern to minimal match
            pat = re.compile("^%s$" % search)
            for col in cols:
                if pat.match(col):
                    found.append(col)
        return found 
[docs]
    def merge(
        self,
        sources=None,
        target_name=None,
        force=False,
        ids=None,
        steps=("merge", "post", "metadata"),
        job_manager=None,
        *args,
        **kwargs,
    ):
        """Merge given sources into a collection named target_name. If sources argument is omitted,
        all sources defined for this merger will be merged together, according to what is defined
        insrc_build_config. If target_name is not defined, a unique name will be generated.
        Optional parameters:
          - force=True will bypass any safety check
          - ids: list of _ids to merge, specifically. If None, all documents are merged.
          - steps:
             * merge: actual merge step, create merged documents and store them
             * post: once merge, run optional post-merge process
             * metadata: generate and store metadata (depends on merger, usually specifies the amount
                         of merged data, source versions, etc...)
        """
        assert job_manager
        # check what to do
        if isinstance(steps, tuple):
            steps = list(steps)  # may not be necessary, but previous steps default is a list, so let's be consistent
        elif isinstance(steps, str):
            steps = [steps]
        self.t0 = time.time()
        self.check_ready(force)
        # normalize
        avail_sources = self.build_config["sources"]
        if sources is None:
            self.target_backend.drop()
            self.target_backend.prepare()
            sources = avail_sources  # merge all
        elif isinstance(sources, str):
            sources = [sources]
        if ids is None:
            # nothing passed specifically, let's have a look at the config
            ids = self.build_config.get("ids")
            if ids:
                # config calls for a merge on specific _ids
                if type(ids) == str:
                    # path to a file
                    m = map(lambda l: l.decode().strip(), open_compressed_file(ids).readlines())
                    ids = [_id for _id in m if not _id.startswith("#")]
        orig_sources = sources
        sources = self.resolve_sources(sources)
        if not sources and "merge" in steps:
            raise BuilderException(
                "No source found, got %s while available sources are: %s" % (repr(orig_sources), repr(avail_sources))
            )
        if not target_name:
            target_name = self.get_target_name()
        self.target_name = target_name
        self.target_backend.set_target_name(self.target_name)
        self.setup_log()  # Force logs will be stored in the target_name file
        self.custom_metadata = {}
        self.clean_old_collections()
        self.logger.info("Merging into target collection '%s'", self.target_backend.target_name)
        strargs = "[sources=%s,target_name=%s]" % (sources, target_name)
        try:
            async def do():
                res = None
                if "merge" in steps or "post" in steps:
                    job = self.merge_sources(
                        source_names=sources, ids=ids, steps=steps, job_manager=job_manager, *args, **kwargs
                    )
                    res = await job
                if "metadata" in steps:
                    pinfo = self.get_pinfo()
                    pinfo["step"] = "metadata"
                    self.register_status("building", transient=True, init=True, job={"step": "metadata"})
                    postjob = await job_manager.defer_to_thread(
                        pinfo, partial(self.store_metadata, res, sources=sources, job_manager=job_manager)
                    )
                    def stored(f):
                        try:
                            nonlocal res
                            if res:
                                res = f.result()  # consume to trigger exceptions if any
                            strargs = "[sources=%s,stats=%s]" % (sources, self.merge_stats)
                            build_version = self.get_build_version()
                            if "." in build_version:
                                raise BuilderException(
                                    "Can't use '.' in build version '%s', it's reserved for minor versions"
                                    % build_version
                                )
                            # get original start dt
                            src_build = self.source_backend.build
                            build = src_build.find_one({"_id": target_name})
                            _meta = {
                                "biothing_type": build["build_config"]["doc_type"],
                                "src": self.src_meta,
                                "stats": self.stats,
                                "build_version": build_version,
                                "build_date": datetime.fromtimestamp(self.t0).astimezone().isoformat(),
                            }
                            # custom
                            _meta.update(self.custom_metadata)
                            self.register_status(
                                "success",
                                build={
                                    "merge_stats": self.merge_stats,
                                    "mapping": self.mapping,
                                    "_meta": _meta,
                                },
                            )
                            self.logger.info("success %s", strargs, extra={"notify": True})
                            # set next step
                            build_conf = AutoBuildConfig(build["build_config"])
                            if build_conf.should_diff_new_build():
                                pending(target_name, "diff")
                            if build_conf.should_snapshot_new_build():
                                pending(target_name, "snapshot")
                        except Exception as e:
                            strargs = "[sources=%s]" % sources
                            self.register_status("failed", job={"err": repr(e)})
                            self.logger.exception("failed %s: %s", strargs, e, extra={"notify": True})
                            raise
                    postjob.add_done_callback(stored)
                    await postjob
            task = asyncio.ensure_future(do())
            return task
        except (KeyboardInterrupt, Exception) as e:
            self.logger.exception(e)
            self.register_status("failed", job={"err": repr(e)})
            self.logger.exception("failed %s: %s", strargs, e, extra={"notify": True})
            raise 
[docs]
    def get_mapper_for_source(self, src_name, init=True):
        # src_name can be a regex (when source has split collections, they are merge but
        # comes from the same "template" sourcek
        docs = self.source_backend.get_src_master_docs()
        mapper_name = None
        for master_name in docs:
            pat = re.compile("^%s$" % master_name)
            if pat.match(src_name):
                mapper_name = docs[master_name].get("mapper")
        # TODO: this could be a list
        try:
            init and self.init_mapper(mapper_name)
            mapper = self.mappers[mapper_name]
            self.logger.info("Found mapper '%s' for source '%s'", mapper, src_name)
            return mapper
        except KeyError:
            raise BuilderException("Found mapper named '%s' but no mapper associated" % mapper_name) 
[docs]
    def merge_order(self, other_sources):
        """Optionally we can override this method to customize the order in which sources should be merged.
        Default as sorted by name.
        """
        return sorted(other_sources) 
[docs]
    async def merge_sources(self, source_names, steps=("merge", "post"), batch_size=100000, ids=None, job_manager=None):
        """
        Merge resources from given source_names or from build config.
        Identify root document sources from the list to first process them.
        ids can a be list of documents to be merged in particular.
        """
        assert job_manager
        # check what to do
        if isinstance(steps, tuple):
            steps = list(steps)  # may not be necessary, but previous steps default is a list, so let's be consistent
        elif isinstance(steps, str):
            steps = [steps]
        do_merge = "merge" in steps
        do_post_merge = "post" in steps
        self.merge_stats = {}
        self.stats = {}
        self.mapping = {}
        # try to identify root document sources amongst the list to first
        # process them (if any)
        defined_root_sources = self.get_root_document_sources()
        root_sources = list(set(source_names).intersection(set(defined_root_sources)))
        other_sources = list(set(source_names).difference(set(root_sources)))
        # got root doc sources but not part of the merge ? that's weird...
        if defined_root_sources and not root_sources:
            self.logger.warning("Root document sources found (%s) but not part of the merge...", defined_root_sources)
        source_names = sorted(source_names)
        root_sources = sorted(root_sources)
        other_sources = self.merge_order(other_sources)
        self.logger.info("Sources to be merged: %s", source_names)
        self.logger.info("Root sources: %s", root_sources)
        self.logger.info("Other sources: %s", other_sources)
        got_error = False
        async def merge(src_names):
            jobs = []
            # for i, src_name in enumerate(src_names):
            for src_name in src_names:
                await asyncio.sleep(0.0)
                job = self.merge_source(src_name, batch_size=batch_size, ids=ids, job_manager=job_manager)
                job = asyncio.ensure_future(job)
                def merged(f, name, stats):
                    try:
                        res = f.result()
                        stats.update(res)
                    except Exception as e:
                        self.logger.exception("Failed merging source '%s': %s", name, e)
                        nonlocal got_error
                        got_error = e
                job.add_done_callback(partial(merged, name=src_name, stats=self.merge_stats))
                jobs.append(job)
                await asyncio.wait([job])
                # raise error as soon as we know something went wrong
                if got_error:
                    raise got_error
            tasks = asyncio.gather(*jobs)
            await tasks
        if do_merge:
            if root_sources:
                self.register_status(
                    "building", transient=True, init=True, job={"step": "merge-root", "sources": root_sources}
                )
                self.logger.info("Merging root document sources: %s", root_sources)
                await merge(root_sources)
                self.register_status("success", job={"step": "merge-root", "sources": root_sources})
            if other_sources:
                self.register_status(
                    "building", transient=True, init=True, job={"step": "merge-others", "sources": other_sources}
                )
                self.logger.info("Merging other resources: %s", other_sources)
                await merge(other_sources)
                self.register_status("success", job={"step": "merge-others", "sources": other_sources})
            self.register_status("building", transient=True, init=True, job={"step": "finalizing"})
            self.logger.info("Finalizing target backend")
            self.target_backend.finalize()
            self.register_status("success", job={"step": "finalizing"})
        else:
            self.logger.info("Skip data merging")
        if do_post_merge:
            self.logger.info("Running post-merge process")
            self.register_status("building", transient=True, init=True, job={"step": "post-merge"})
            pinfo = self.get_pinfo()
            pinfo["step"] = "post-merge"
            job = await job_manager.defer_to_thread(
                pinfo, partial(self.post_merge, source_names, batch_size, job_manager)
            )
            job = asyncio.ensure_future(job)
            def postmerged(f):
                try:
                    self.logger.info("Post-merge completed [%s]", f.result())
                    self.register_status("success", job={"step": "post-merge"})
                except Exception as e:
                    self.logger.exception("Failed post-merging source: %s", e)
                    nonlocal got_error
                    got_error = e
            job.add_done_callback(postmerged)
            await job
            if got_error:
                raise got_error
        else:
            self.logger.info("Skip post-merge process")
        await asyncio.sleep(0.0)
        return self.merge_stats 
[docs]
    def document_cleaner(self, src_name, *args, **kwargs):
        """
        Return a function taking a document as argument, cleaning the doc
        as needed, and returning that doc. If no function is needed, None.
        Note: the returned function must be pickleable, careful with lambdas
        and closures.
        """
        return None 
[docs]
    async def merge_source(self, src_name, batch_size=100000, ids=None, job_manager=None):
        # it's actually not optional
        assert job_manager
        _query = self.generate_document_query(src_name)
        # Note: no need to check if there's an existing document with _id (we want to merge only with an existing document)
        # if the document doesn't exist then the update() call will silently fail.
        # That being said... if no root documents, then there won't be any previously inserted
        # documents, and this update() would just do nothing. So if no root docs, then upsert
        # (update or insert, but do something)
        defined_root_sources = self.get_root_document_sources()
        upsert = not defined_root_sources or src_name in defined_root_sources
        if not upsert:
            self.logger.debug(
                "Documents from source '%s' will be stored only if a previous document exists with same _id", src_name
            )
        jobs = []
        total = self.source_backend[src_name].count()
        btotal = math.ceil(total / batch_size)
        bnum = 1
        cnt = 0
        got_error = False
        # grab ids only, so we can get more, let's say 10 times more
        id_batch_size = batch_size * 10
        # FIXME id_provider initialized below will be overwritten by `if _query and ids is None:` code block
        if ids:
            self.logger.info(
                "Merging '%s' specific list of _ids, create merger job with batch_size=%d", src_name, batch_size
            )
            id_provider = [ids]
        else:
            self.logger.info(
                "Fetch _ids from '%s' with batch_size=%d, and create merger job with batch_size=%d",
                src_name,
                id_batch_size,
                batch_size,
            )
            id_provider = id_feeder(self.source_backend[src_name], batch_size=id_batch_size, logger=self.logger)
        if _query and ids is not None:
            self.logger.info("Query/filter involved, but also specific list of _ids. Ignoring query and use _ids")
        if _query and ids is None:
            self.logger.info("Query/filter involved, can't use cache to fetch _ids")
            # use doc_feeder but post-process doc to keep only the _id
            id_provider = map(
                lambda docs: [d["_id"] for d in docs],
                doc_feeder(
                    self.source_backend[src_name],
                    query=_query,
                    step=batch_size,
                    inbatch=True,
                    fields={"_id": 1},
                    logger=self.logger,
                ),
            )
        else:
            # when passing a list of _ids, IDs will be sent to the query, so we need to reduce the batch size
            id_provider = (
                ids
                and iter_n(ids, int(batch_size / 100))
                or id_feeder(self.source_backend[src_name], batch_size=id_batch_size, logger=self.logger)
            )
        src_master = self.source_backend.master
        meta = src_master.find_one({"_id": src_name}) or {}
        merger = meta.get("merger", "upsert")
        self.logger.info("Documents from source '%s' will be merged using %s", src_name, merger)
        merger_kwargs = meta.get("merger_kwargs")
        if merger_kwargs:
            self.logger.info(
                "Documents from source '%s' will be using these extra parameters during the merge %s",
                src_name,
                merger_kwargs,
            )
        doc_cleaner = self.document_cleaner(src_name)
        for big_doc_ids in id_provider:
            for doc_ids in iter_n(big_doc_ids, batch_size):
                # try to put some async here to give control back
                # (but everybody knows it's a blocking call: doc_feeder)
                await asyncio.sleep(0.1)
                cnt += len(doc_ids)
                pinfo = self.get_pinfo()
                pinfo["step"] = src_name
                pinfo["description"] = "#%d/%d (%.1f%%)" % (bnum, btotal, (cnt / total * 100))
                self.logger.info(
                    "Creating merger job #%d/%d, to process '%s' %d/%d (%.1f%%)",
                    bnum,
                    btotal,
                    src_name,
                    cnt,
                    total,
                    (cnt / total * 100.0),
                )
                job = await job_manager.defer_to_process(
                    pinfo,
                    partial(
                        merger_worker,
                        self.source_backend[src_name].name,
                        self.target_backend.target_name,
                        doc_ids,
                        self.get_mapper_for_source(src_name, init=False),
                        doc_cleaner,
                        upsert,
                        merger,
                        bnum,
                        merger_kwargs,
                    ),
                )
                def batch_merged(f, batch_num):
                    nonlocal got_error
                    if type(f.result()) != int:
                        got_error = Exception(
                            "Batch #%s failed while merging source '%s' [%s]" % (batch_num, src_name, f.result())
                        )
                job.add_done_callback(partial(batch_merged, batch_num=bnum))
                jobs.append(job)
                bnum += 1
                # raise error as soon as we know
                if got_error:
                    raise got_error
        self.logger.info("%d jobs created for merging step", len(jobs))
        tasks = asyncio.gather(*jobs)
        def done(f):
            nonlocal got_error
            if None in f.result():
                got_error = Exception("Some batches failed")
                return
            # compute overall inserted/updated records (consume result() and check summable)
            _ = sum(f.result())
        tasks.add_done_callback(done)
        await tasks
        if got_error:
            raise got_error
        else:
            return {"%s" % src_name: cnt} 
[docs]
    def post_merge(self, source_names, batch_size, job_manager):
        pass 
 
[docs]
class LinkDataBuilder(DataBuilder):
    """
    LinkDataBuilder creates a link to the original datasource to be merged, without
    actually copying the data (merged collection remains empty). This builder is
    only valid when using only one datasource (thus no real merge) is declared in
    the list of sources to be merged, and is useful to prevent data duplication between
    the datasource itself and the resulting merged collection.
    """
    def __init__(self, build_name, source_backend, target_backend, *args, **kwargs):
        super().__init__(build_name, source_backend, target_backend=partial(LinkTargetDocMongoBackend), *args, **kwargs)
        conf = self.source_backend.get_build_configuration(self.build_name)
        assert len(conf["sources"]) == 1, "Found more than one source to link, not allowed: %s" % conf["sources"]
        assert hasattr(self.target_backend, "datasource_name")
        self.target_backend.datasource_name = conf["sources"][0]
        self.target_backend.source_db = self.source_backend
[docs]
    async def merge_source(self, src_name, *args, **kwargs):
        total = self.source_backend[src_name].count()
        return {"%s" % src_name: total} 
 
[docs]
def fix_batch_duplicates(docs, fail_if_struct_is_different=False):
    """
    Remove duplicates from docs based on _id. If _id's the same but
    structure is different (not real "duplicates", but different documents
    with the same _ids), merge docs all together (dict.update)
    or raise an error if fail_if_struct_is_different.
    """
    dids = {}
    # docs per _id
    for d in docs:
        dids.setdefault(d["_id"], []).append(d)
    # now check doc structure for each duplicates
    # if same structure, replace with one occurence of the docs
    # if not the same, log all the docs as warning, and merge them all
    # as we would do if we were upserting doc one-by-one (no batch)
    # note: dict are unhashable (no set) so either compare one each other (n^2-ish)
    # or use json strings (let's try json...)
    for _id in dids:
        jl = set([json.dumps(e, sort_keys=True) for e in dids[_id]])
        if len(jl) > 1:
            # different structure
            if fail_if_struct_is_different:
                raise ValueError("Found duplicated with different document structure: %s" % dids[_id])
            else:
                logging.warning(
                    "Found duplicated with different document structure, merging them altogether: %s" % dids[_id]
                )
            # merge docs on top of each other
            dupdocs = dids[_id]
            merged = {}
            [merged.update(d) for d in dupdocs]
            dids[_id] = merged
        else:
            assert len(jl) == 1
            # normalize to scalar
            dids[_id] = dids[_id][0]
    return list(dids.values()) 
[docs]
def merger_worker(col_name, dest_name, ids, mapper, cleaner, upsert, merger, batch_num, merger_kwargs=None):
    try:
        src = mongo.get_src_db()
        tgt = mongo.get_target_db()
        col = src[col_name]
        dest = DocMongoBackend(tgt, tgt[dest_name])
        cur = doc_feeder(col, step=len(ids), inbatch=False, query={"_id": {"$in": ids}})
        if cleaner:
            cur = map(cleaner, cur)
        mapper.load()
        docs = [d for d in mapper.process(cur)]
        # while documents from cursor "cur" are unique, at this point, due to the use
        # a mapper, documents can be converted and there now can be duplicates (same _id)
        # (ex: mygene, ensembl -> entrez conversion). "docs" could produce a duplicated error
        # within the batch, so we need to remove duplicates.
        all_ids = [d["_id"] for d in docs]
        uniq_ids = set(all_ids)
        if len(all_ids) != len(uniq_ids):
            logging.warning("Found duplicated IDs within batch, trying to fix")
            docs = fix_batch_duplicates(docs)
        if merger == "merge_struct":
            stored_docs = dest.mget_from_ids([d["_id"] for d in docs])
            ddocs = dict([(d["_id"], d) for d in docs])
            if merger_kwargs:
                for d in stored_docs:
                    # Merge the old document in mongodb into the new document
                    ddocs[d["_id"]] = merge_struct(ddocs[d["_id"]], d, **merger_kwargs)
            else:
                for d in stored_docs:
                    ddocs[d["_id"]] = merge_struct(d, ddocs[d["_id"]])
            docs = list(ddocs.values())
        cnt = dest.update(docs, upsert=upsert)
        return cnt
    except Exception as e:
        logger_name = "build_%s_%s_batch_%s" % (dest_name, col_name, batch_num)
        logger, _ = get_logger(logger_name, btconfig.LOG_FOLDER)
        logger.exception(e)
        logger.error(
            "col_name: %s, dest_name: %s, ids: see pickle, " % (col_name, dest_name)
            + "mapper: %s, cleaner: %s, upsert: %s, " % (mapper, cleaner, upsert)
            + "merger: %s, batch_num: %s" % (merger, batch_num)
        )
        exc_fn = os.path.join(btconfig.LOG_FOLDER, "%s.exc.pick" % logger_name)
        pickle.dump(e, open(exc_fn, "wb"))
        logger.info("Exception was dumped in pickle file '%s'", exc_fn)
        ids_fn = os.path.join(btconfig.LOG_FOLDER, "%s.ids.pick" % logger_name)
        pickle.dump(ids, open(ids_fn, "wb"))
        logger.info("IDs dumped in pickle file '%s'", ids_fn)
        dat_fn = os.path.join(btconfig.LOG_FOLDER, "%s.docs.pick" % logger_name)
        pickle.dump(docs, open(dat_fn, "wb"))
        logger.info("Data (batch of docs) dumped in pickle file '%s'", dat_fn)
        raise 
[docs]
def set_pending_to_build(conf_name=None):
    src_build_config = get_src_build_config()
    qfilter = {}
    if conf_name:
        qfilter = {"_id": conf_name}
    logging.info(
        "Setting pending_to_build flag for configuration(s): %s" % (conf_name and conf_name or "all configuraitons")
    )
    src_build_config.update(qfilter, {"$addToSet": {"pending": "build"}}) 
[docs]
class BuilderManager(BaseManager):
    def __init__(
        self,
        source_backend_factory=None,
        target_backend_factory=None,
        builder_class=None,
        poll_schedule=None,
        *args,
        **kwargs,
    ):
        """
        BuilderManager deals with the different builders used to merge datasources.
        It is connected to src_build() via sync(), where it grabs build information
        and register builder classes, ready to be instantiate when triggering builds.
        source_backend_factory can be a optional factory function (like a partial) that
        builder can call without any argument to generate a SourceBackend.
        Same for target_backend_factory for the TargetBackend. builder_class if given
        will be used as the actual Builder class used for the merge and will be passed
        same arguments as the base DataBuilder. It can also be a list of classes, in which
        case the default used one is the first, when it's necessary to define multiple builders.
        """
        super(BuilderManager, self).__init__(*args, **kwargs)
        self.src_build_config = get_src_build_config()
        self.source_backend_factory = source_backend_factory
        self.target_backend_factory = target_backend_factory
        builder_class = builder_class or DataBuilder
        if isinstance(builder_class, list):
            self.arg_builder_classes = builder_class
        else:
            self.arg_builder_classes = [builder_class]
        self.default_builder_class = self.arg_builder_classes[0] or DataBuilder
        self.builder_classes = {}
        self.poll_schedule = poll_schedule
        self.setup_log()
[docs]
    def clean_stale_status(self):
        src_build = get_src_build()
        for build in src_build.find():
            dirty = False
            for job in build.get("jobs", []):
                if job.get("status") == "building":
                    logging.warning("Found stale build '%s', marking build status as 'canceled'" % build["_id"])
                    job["status"] = "canceled"
                    dirty = True
            if dirty:
                src_build.replace_one({"_id": build["_id"]}, build) 
    @property
    def source_backend(self):
        source_backend = (
            self.source_backend_factory
            and self.source_backend_factory()
            or partial(
                SourceDocMongoBackend,
                build_config=partial(get_src_build_config),
                build=partial(get_src_build),
                master=partial(get_src_master),
                dump=partial(get_src_dump),
                sources=partial(mongo.get_src_db),
            )
        )
        return source_backend
    @property
    def target_backend(self):
        target_backend = (
            self.target_backend_factory
            and self.target_backend_factory()
            or partial(TargetDocMongoBackend, target_db=partial(mongo.get_target_db))
        )
        return target_backend
[docs]
    def get_builder_class(self, build_config_name):
        """
        builder class can be specified different way (in order):
        1. within the build_config document (so, per configuration)
        2. or defined in the builder manager (so, per manager)
        3. or default to DataBuilder
        """
        builder_class = None
        conf = self.src_build_config.find_one({"_id": build_config_name})
        if conf.get("builder_class"):
            builder_class = self.builder_classes[conf["builder_class"]]["class"]
        elif self.default_builder_class:
            builder_class = self.default_builder_class
        else:
            builder_class = DataBuilder
        return builder_class 
[docs]
    def register_builder(self, build_name):
        # will use partial to postponse object creations and their db connection
        # as we don't want to keep connection alive for undetermined amount of time
        # declare source backend
        def create(build_name):
            # postpone config import so app had time to set it up
            # before actual call time
            from biothings import config
            # assemble the whole
            klass = self.get_builder_class(build_name)
            self.logger.info("Build config '%s' will use builder class %s", build_name, klass)
            bdr = klass(
                build_name,
                source_backend=self.source_backend,
                target_backend=self.target_backend,
                log_folder=config.LOG_FOLDER,
            )
            return bdr
        self.register[build_name] = partial(create, build_name) 
[docs]
    def get_builder(self, col_name):
        doc = get_src_build().find_one({"_id": col_name})
        if not doc:
            raise BuilderException("No such build named '%s'" % repr(col_name))
        assert "build_config" in doc, "Expecting build_config information"
        klass = self.get_builder_class(doc["build_config"]["name"])
        bdr = klass(
            doc["build_config"]["name"],
            source_backend=self.source_backend,
            target_backend=self.target_backend,
            log_folder=btconfig.LOG_FOLDER,
        )
        # overwrite with existing values
        bdr.build_config = doc["build_config"]
        bdr.target_backend.set_target_name(col_name)
        return bdr 
[docs]
    def delete_merged_data(self, merge_name):
        target_db = mongo.get_target_db()
        col = target_db[merge_name]
        col.drop() 
[docs]
    def delete_merge(self, merge_name):
        """Delete merged collections and associated metadata"""
        db = get_src_build()
        meta = db.find_one({"_id": merge_name})
        if meta:
            db.remove({"_id": merge_name})
        else:
            self.logger.warning("No metadata found for merged collection '%s'", merge_name)
        self.delete_merged_data(merge_name) 
[docs]
    def archive_merge(self, merge_name):
        """Delete merged collections and associated metadata"""
        db = get_src_build()
        meta = db.find_one({"_id": merge_name})
        if meta:
            meta["archived"] = datetime.now()
            db.replace_one({"_id": merge_name}, meta)
        else:
            self.logger.warning("No metadata found for merged collection '%s'", merge_name)
        self.delete_merged_data(merge_name) 
[docs]
    def get_query_for_list_merge(self, only_archived, status=None):
        q = {"archived": {"$exists": 0}}
        if only_archived:
            q = {"archived": {"$exists": 1}}
        if status:
            if status == "success":
                q["jobs.status"] = {"$not": {"$in": ["failed", "canceled"]}}
            elif status in ["failed", "canceled"]:
                q["jobs.status"] = status
        return q 
[docs]
    def list_merge(self, build_config=None, only_archived=False):
        q = self.get_query_for_list_merge(only_archived)
        docs = get_src_build().find(q)
        by_confs = {}
        for d in docs:
            by_confs.setdefault(d.get("build_config", {}).get("name", None), []).append(d["_id"])
        if build_config:
            return sorted(by_confs.get(build_config, []))
        else:
            for conf in by_confs:
                by_confs[conf] = sorted(by_confs[conf])
            return by_confs 
[docs]
    def setup_log(self):
        self.logger, self.logfile = get_logger("buildmanager") 
    def __getitem__(self, build_name):
        """
        Return an instance of a builder for the build named 'build_name'
        Note: each call returns a different instance (factory call behind the scene...)
        """
        # we'll get a partial class but will return an instance
        pclass = BaseManager.__getitem__(self, build_name)
        return pclass()
[docs]
    def resolve_builder_class(self, klass):
        """
        Resolve class/partial definition to (obj,"type","mod.class")
        where names (class name, module, docstring, etc...) can
        directly be accessed whether it's a standard class or not
        """
        obj = klass
        if type(klass) == partial:
            assert type(klass.func) == type
            btype = "partial"
            obj = klass.func
        elif type(klass) == type:
            btype = "class"
        else:
            raise TypeError("Unknown type for builder %s" % repr(klass))
        modstr = obj.__module__
        classstr = obj.__name__
        classpathstr = "%s.%s" % (modstr, classstr)
        return (obj, btype, classpathstr) 
[docs]
    def find_builder_classes(self):
        """
        Find all available build class:
         1. classes passed during manager init (build_class)
            (that includes the default builder)
         2. all subclassing DataBuilder in:
           a. biothings.hub.databuilder.*
           b. hub.databuilder.* (app-specific)
        """
        bclasses = set(self.arg_builder_classes)
        mods = [sys.modules[__name__]]
        try:
            import hub.databuild as m
            mods.append(m)
        except ImportError:
            pass
        for klass in find_classes_subclassing(mods, DataBuilder):
            bclasses.add(klass)
        for klass in bclasses:
            try:
                obj, btype, classpathstr = self.resolve_builder_class(klass)
                helpstr = obj.__doc__ and " ".join(map(str.strip, obj.__doc__.splitlines()))
                self.builder_classes[classpathstr] = {
                    "desc": helpstr,
                    "type": btype,
                    "class": klass,
                    "default": klass == self.default_builder_class,
                }
            except Exception as e:
                logging.exception("Can't extract information from builder class %s: %s" % (repr(klass), e)) 
[docs]
    def merge(self, build_name, sources=None, target_name=None, **kwargs):
        """
        Trigger a merge for build named 'build_name'. Optional list of sources can be
        passed (one single or a list). target_name is the target collection name used
        to store to merge data. If none, each call will generate a unique target_name.
        """
        try:
            bdr = self[build_name]
            job = bdr.merge(sources, target_name, job_manager=self.job_manager, **kwargs)
            return job
        except KeyError:
            raise BuilderException("No such builder for '%s'" % build_name)
        except ResourceNotReady as e:
            raise BuilderException(f"Some datasources aren't ready for the merge: {e}") 
[docs]
    def list_sources(self, build_name):
        """
        List all registered sources used to trigger a build named 'build_name'
        """
        info = self.src_build_config.find_one({"_id": build_name})
        return info and info["sources"] or [] 
[docs]
    def whatsnew(self, build_name=None, old=None):
        """
        Return datasources which have changed since last time
        (last time is datasource information from metadata, either from
        given old src_build doc name, or the latest found if old=None)
        """
        dbbuild = get_src_build()
        dbdump = get_src_dump()
        def whatsnewcomparedto(build_name, old=None):
            if old is None:
                # TODO: this will get big... but needs to be generic
                # because we handle different hub db backends (or it needs to be a
                # specific helper func to be defined all backends
                # FIXME: this gets slower as hub gets more builds, we are
                #  finding all builds of all build configs when /whatsnew gets
                #  requested
                builds = dbbuild.find({"build_config.name": build_name})
                builds = sorted(builds, key=lambda e: e["started_at"], reverse=True)
                if not builds:
                    raise BuilderException(f"Can't find a build associated to config '{build_name}'")
                # Pickup latest success build
                old = None
                for _build in builds:
                    if _build.get("_meta", {}).get("src"):
                        old = _build
                        break
                if not old:
                    raise BuilderException(f"Can't find a success build associated to config '{build_name}'")
            else:
                old = dbbuild.find_one({"_id": old})
            meta_srcs = old.get("_meta", {}).get("src", {})
            new = {
                "old_build": {
                    "name": old["_id"],
                    "built_at": old["started_at"],
                },
                "sources": {},
            }
            for src_name, data in meta_srcs.items():
                try:
                    srcd = dbdump.find_one({"_id": src_name})
                    try:
                        if srcd and srcd.get("download"):
                            if not srcd["download"]["status"] == "success":
                                srcd = None
                        if srcd and srcd.get("upload"):
                            for sub in srcd["upload"]["jobs"].values():
                                if not sub["status"] == "success":
                                    srcd = None
                                    break
                    except KeyError as e:
                        self.logger.warning("whatsnew: src_dump:%s missing keys: %s, not touching", src_name, e)
                    if srcd and not srcd.get("download") and srcd.get("upload"):
                        # this is a collection only source, find all releases in sub sources, hopefully all are the same
                        rels = [sub["release"] for sub in srcd["upload"]["jobs"].values()]
                        srels = set(rels)
                        if len(srels) != 1:
                            raise ValueError(
                                "Found different releases in sub-sources, expected only one common: %s" % repr(rels)
                            )
                        rel = rels[0]
                        if data.get("version") and rel != data["version"]:
                            new["sources"][src_name] = {"old": {"version": data["version"]}, "new": {"version": rel}}
                    elif (
                        srcd
                        and srcd.get("download", {}).get("release")
                        and srcd["download"]["release"] != data["version"]
                    ):
                        new["sources"][src_name] = {
                            "old": {"version": data["version"]},
                            "new": {
                                "version": srcd["download"]["release"],
                                "downloaded_at": srcd["download"].get("started_at"),
                            },
                        }
                except Exception as e:
                    self.logger.warning("Can't check what's new for source '%s': %s", src_name, e)
            return {build_name: new}
        if old is None and build_name is None:
            # do this for all build configs
            dbbuildconfig = get_src_build_config()
            configs = {}
            for d in dbbuildconfig.find(
                {
                    "$or": [
                        {"archived": {"$exists": 0}},
                        {"archived": False},
                    ]
                }
            ):
                try:
                    news = whatsnewcomparedto(d["_id"])
                    if news[d["_id"]]["sources"]:
                        configs.update(news)
                except BuilderException:
                    continue
            return configs
        else:
            return whatsnewcomparedto(build_name, old) 
[docs]
    def clean_temp_collections(self, build_name, date=None, prefix=""):
        """
        Delete all target collections created from builder named
        "build_name" at given date (or any date is none given -- carefull...).
        Date is a string (YYYYMMDD or regex)
        Common collection name prefix can also be specified if needed.
        """
        target_db = mongo.get_target_db()
        for col_name in target_db.collection_names():
            search = prefix and prefix + "_" or ""
            search += build_name + "_"
            search += date and date + "_" or ""
            pat = re.compile(search)
            if pat.match(col_name) and "current" not in col_name:
                logging.info("Dropping target collection '%s" % col_name)
                target_db[col_name].drop() 
[docs]
    def poll(self):
        """
        Check "whatsnew()" to idenfity builds which could be automatically built,
        if {"autobuild" : {...}} is part of the build configuration. "autobuild" contains
        a dict with "schedule" (aiocron/crontab format), so each build configuration can
        have a different polling schedule.
        """
        # don't use $exists in find(), not all hub backend implements that
        logger, _ = get_logger("autobuild")
        schedules = {
            conf["_id"]: conf["autobuild"]["schedule"]
            for conf in get_src_build_config().find()
            if conf.get("autobuild", {}).get("schedule")
        }
        async def _autobuild(conf_name):
            new = self.whatsnew(conf_name)
            logger.info(f"{conf_name}:{schedules[conf_name]}")
            logger.info(f"{conf_name}:{new}")
            if new[conf_name]["sources"]:
                self.merge(conf_name)
                logger.info(f"{conf_name}:merge(*)")
            else:
                logger.info(f"{conf_name}:pass")
        logger.info(datetime.now().astimezone())
        logger.info(schedules)  # all schedules
        for _id, _schedule in schedules.items():
            try:
                aiocron.crontab(_schedule, func=partial(_autobuild, _id), start=True, loop=self.job_manager.loop)
            except Exception:
                logger.exception((_id, _schedule)) 
[docs]
    def trigger_merge(self, doc):
        return self.merge(doc["_id"]) 
[docs]
    def build_config_info(self):
        configs = {}
        err = None
        for name in self.register:
            try:
                builder = self[name]
            except Exception as e:
                conf = get_src_build_config().find_one({"_id": name})
                if conf:
                    builder = dotdict({"build_config": conf})
                else:
                    builder = None
                err = str(e)
            if (
                not builder
                or issubclass(builder.target_backend.__class__, LinkTargetDocMongoBackend)
                or issubclass(builder.__class__, dict)
            ):  # fake builder obj
                target_db = None  # it's not a traditional target database, it's pointing to
                # somewhere else (TODO: maybe LinkTargetDocMongoBackend should
                # implement more methods to return info about that
            else:
                target_db = builder.target_backend.target_collection.database.client.address
            configs[name] = {
                "build_config": builder and builder.build_config,
                "archived": "archived" in (builder and builder.build_config or []),
            }
            if builder and builder.source_backend:
                configs[name]["source_backend"] = {
                    "type": builder and builder.source_backend.__class__.__name__,
                    "source_db": builder and builder.source_backend.sources.client.address,
                }
            if builder and builder.target_backend:
                configs[name]["target_backend"] = {
                    "type": builder and builder.target_backend.__class__.__name__,
                    "target_db": target_db,
                }
            if err:
                configs[name]["error"] = err
            if builder and builder.mappers:
                configs[name]["mapper"] = {}
                for mappername, mapper in builder.mappers.items():
                    configs[name]["mapper"][mappername] = mapper.__class__.__name__
        res = {"build_configs": configs}
        # dict contains an actual class, non-serializable, so adjust:
        bclasses = copy.deepcopy(self.builder_classes)
        for _k, v in bclasses.items():
            v.pop("class")
        res["builder_classes"] = bclasses
        return res 
[docs]
    def build_info(
        self,
        id=None,
        conf_name=None,
        fields=None,
        only_archived=False,
        status=None,
    ):
        """
        Return build information given an build _id, or all builds
        if _id is None. "fields" can be passed to select which fields
        to return or not (mongo notation for projections), if None
        return everything except:
         - "mapping" (too long)
        If id is None, more are filtered:
         - "sources" and some of "build_config"
        only_archived=True will return archived merges only
        status: will return only successful/failed builds. Can be "success" or "failed"
        """
        res = {}
        q = self.get_query_for_list_merge(only_archived=only_archived, status=status)
        if id is not None:
            q = {"_id": id}
        else:
            fields = {}
            fields["mapping"] = 0
            fields["sources"] = 0
            fields["build_config.sources"] = 0
            fields["build_config.root"] = 0
        if conf_name is not None:
            q["build_config._id"] = conf_name
        builds = [b for b in get_src_build().find(q, fields)]
        res = [b for b in sorted(builds, key=lambda e: str(e.get("started_at") or ""), reverse=True)]
        # set a global status (ie. latest job's status)
        # + get total #docs
        db = mongo.get_target_db()
        for b in res:
            jobs = b.get("jobs", [])
            b["status"] = "unknown"
            if jobs:
                b["status"] = jobs[-1]["status"]
            try:
                backend = create_backend(b["backend_url"])
                b["count"] = backend.count()
            except KeyError:
                b["count"] = db[b["_id"]].count()
        if id:
            if res:
                return res.pop()
            else:
                raise ValueError("No such build named '%s'" % id)
        else:
            return res 
[docs]
    def upsert_build_conf(self, name, doc_type, sources, roots, builder_class, params, archived):
        col = get_src_build_config()
        builder_class = builder_class or self.resolve_builder_class(self.default_builder_class)[2]  # class path string
        doc = {
            "_id": name,
            "name": name,
            "doc_type": doc_type,
            "sources": sources,
            "root": roots,
            "builder_class": builder_class,
        }
        if archived:
            doc["archived"] = True
        else:
            doc.pop("archived", None)
        doc.update(params)
        col.save(doc)
        self.configure() 
[docs]
    def create_build_configuration(
        self,
        name,
        doc_type,
        sources,
        roots=None,
        builder_class=None,
        params=None,
        archived=False,
    ):
        roots = roots or []
        params = params or {}
        col = get_src_build_config()
        # check conf doesn't exist yet
        if [d for d in col.find({"_id": name})]:
            raise ValueError("Configuration named '%s' already exists" % name)
        self.upsert_build_conf(name, doc_type, sources, roots, builder_class, params, archived) 
[docs]
    def update_build_configuration(
        self,
        name,
        doc_type,
        sources,
        roots=None,
        builder_class=None,
        params=None,
        archived=False,
    ):
        roots = roots or []
        params = params or {}
        self.upsert_build_conf(name, doc_type, sources, roots, builder_class, params, archived) 
[docs]
    def delete_build_configuration(self, name):
        col = get_src_build_config()
        col.remove({"_id": name})
        self.configure() 
[docs]
    def save_mapping(self, name, mapping=None, dest="build", mode="mapping"):
        logging.debug("Saving mapping for build '%s' destination='%s':\n%s", name, dest, pformat(mapping))
        src_build = get_src_build()
        m = src_build.find_one({"_id": name})
        assert m, "Can't find build document for '%s'" % name
        # either given a fully qualified source or just sub-source
        if dest == "build":
            m["mapping"] = mapping
            src_build.save(m)
        elif dest == "inspect":
            try:
                m["inspect"]["results"][mode] = mapping
                src_build.save(m)
            except KeyError:
                raise ValueError("Can't save mapping, document doesn't contain expected inspection data")
        else:
            raise ValueError("Unknow saving destination: %s" % repr(dest))