import asyncio
import copy
import json
import math
import os
import pickle
import re
import sys
import time
from datetime import datetime
from functools import partial
from pprint import pformat
import aiocron
import biothings.utils.mongo as mongo
from biothings import config as btconfig
from biothings.hub import BUILDER_CATEGORY, UPLOADER_CATEGORY
from biothings.utils.backend import DocMongoBackend
from biothings.utils.common import (
dotdict,
find_classes_subclassing,
get_random_string,
iter_n,
open_compressed_file,
timesofar,
)
from biothings.utils.dataload import merge_struct
from biothings.utils.hub_db import (
get_source_fullname,
get_src_build,
get_src_build_config,
get_src_dump,
get_src_master,
)
from biothings.utils.loggers import get_logger
from biothings.utils.manager import BaseManager
from biothings.utils.mongo import doc_feeder, id_feeder
from ..databuild.backend import LinkTargetDocMongoBackend, SourceDocMongoBackend, TargetDocMongoBackend
from ..dataload.uploader import ResourceNotReady
from .backend import create_backend
from .buildconfig import AutoBuildConfig
from .mapper import TransparentMapper
logging = btconfig.logger
[docs]
def pending(build_name, action_name):
src_build = get_src_build()
src_build.update({"_id": build_name}, {"$addToSet": {"pending": action_name}})
[docs]
class BuilderException(Exception):
pass
[docs]
class ResumeException(Exception):
pass
[docs]
class DataBuilder(object):
"""
Generic data builder.
"""
keep_archive = 10 # number of archived collection to keep. Oldest get dropped first.
def __init__(
self,
build_name,
source_backend,
target_backend,
log_folder,
doc_root_key="root",
mappers=None,
default_mapper_class=TransparentMapper,
sources=None,
target_name=None,
**kwargs,
):
mappers = mappers or []
self.init_state()
self.build_name = build_name
self.sources = sources
self.target_name = target_name
self._partial_source_backend = None
self._partial_target_backend = None
if type(source_backend) == partial:
self._partial_source_backend = source_backend
else:
self._state["source_backend"] = source_backend
if type(target_backend) == partial:
self._partial_target_backend = target_backend
else:
self._state["target_backend"] = target_backend
# doc_root_key is a key name within src_build_config doc.
# it's a list of datasources that are able to create a document
# even it doesn't exist. It root documents list is not empty, then
# any other datasources not listed there won't be able to create
# a document, only will they able to update it.
# If no root documets, any datasources can create/update a doc
# and thus there's no priority nor limitations
# note: negations can be used, like "!source1". meaning source1 is not
# root document datasource.
# Usefull to express; "all resources except source1"
self.doc_root_key = doc_root_key
# overall merge start time
self.t0 = time.time()
# step merge start time
self.ti = time.time()
self.logfile = None
self.log_folder = log_folder
self.mappers = {}
self.timestamp = datetime.now()
self.merge_stats = {} # keep track of cnt per source, etc...
self.src_meta = {} # sources involved in this build (includes versions)
self.stats = {} # can be customized
self.mapping = {} # ES mapping (merged from src_master's docs)
for mapper in mappers + [default_mapper_class()]:
self.mappers[mapper.name] = mapper
self.step = kwargs.get("step", 10000)
self.prepared = False
[docs]
def init_state(self):
self._state = {
"logger": None,
"source_backend": None,
"target_backend": None,
"build_config": None,
}
@property
def logger(self):
if not self._state["logger"]:
self.prepare()
return self._state["logger"]
@property
def source_backend(self):
if self._state["source_backend"] is None:
self.prepare()
self._state["build_config"] = self._state["source_backend"].get_build_configuration(self.build_name)
self._state["source_backend"].validate_sources(self.sources)
return self._state["source_backend"]
@property
def target_backend(self):
if self._state["target_backend"] is None:
self.prepare()
return self._state["target_backend"]
@property
def build_config(self):
self._state["build_config"] = self.source_backend.get_build_configuration(self.build_name)
return self._state["build_config"]
@logger.setter
def logger(self, value):
self._state["logger"] = value
@build_config.setter
def build_config(self, value):
self._state["build_config"] = value
[docs]
def prepare(self, state=None):
state = state or {}
if self.prepared:
return
if state:
# let's be explicit, _state takes what it wants
for k in self._state:
self._state[k] = state[k]
return
if self._partial_source_backend:
self._state["source_backend"] = self._partial_source_backend()
if self._partial_target_backend:
self._state["target_backend"] = self._partial_target_backend()
self.setup()
self.setup_log()
self.prepared = True
[docs]
def unprepare(self):
"""
reset anything that's not pickable (so self can be pickled)
return what's been reset as a dict, so self can be restored
once pickled
"""
# TODO: use copy ?
state = {
"logger": self._state["logger"],
"source_backend": self._state["source_backend"],
"target_backend": self._state["target_backend"],
"build_config": self._state["build_config"],
}
for k in state:
self._state[k] = None
self.prepared = False
return state
[docs]
def get_predicates(self):
"""
Return a list of predicates (functions returning true/false, as in math logic)
which instructs/dictates if job manager should start a job (process/thread)
"""
def no_uploader_running(job_manager):
"""Uploaders could change the data to be merged..."""
num_offenders = 0
# self.sources is not populated. thanks.
sources = self.build_config.get("sources", [])
offending_sources = set()
for src in sources:
src_full_name = get_source_fullname(src)
offending_sources.add(src_full_name)
self.logger.debug("no_uploader_running: src full names %s", offending_sources)
for job in job_manager.jobs.values():
if job["category"] == UPLOADER_CATEGORY:
if "source" in job:
if job["source"] in offending_sources:
num_offenders += 1
self.logger.info("%s uploader running cannot build for now", job["source"])
else:
num_offenders += 1
self.logger.warning(
"uploader with pinfo: %s running, no source info. cannot build for now", job
)
else:
pass # job is not an uploader
return num_offenders == 0
# def no_merger_running():
# """
# Mergers use cache files, if more than one running and caches need to be built
# both would try to write on the same cache file
# """
# return len([j for j in job_manager.jobs.values() if j["category"] == BUILDER_CATEGORY]) == 0
return [no_uploader_running]
[docs]
def get_pinfo(self):
"""
Return dict containing information about the current process
(used to report in the hub)
"""
pinfo = {
"category": BUILDER_CATEGORY,
"source": "%s:%s" % (self.build_name, self.target_backend.target_name),
"step": "",
"description": "",
}
preds = self.get_predicates()
if preds:
pinfo["__predicates__"] = preds
return pinfo
[docs]
def setup_log(self):
log_name = self.target_name or self.build_name
log_folder = os.path.join(btconfig.LOG_FOLDER, "build", log_name) if btconfig.LOG_FOLDER else None
self.logger, _ = get_logger("build", log_folder=log_folder, force=True)
[docs]
def check_ready(self, force=False):
if force:
# don't even bother
return
src_build_config = self.source_backend.build_config
src_dump = self.source_backend.dump
_cfg = src_build_config.find_one({"_id": self.build_config["name"]})
# check if all resources are uploaded
for src_name in _cfg["sources"]:
fullname = get_source_fullname(src_name)
if not fullname:
raise ResourceNotReady("Can't find source '%s'" % src_name)
main_name = fullname.split(".")[0]
src_doc = src_dump.find_one({"_id": main_name})
if not src_doc:
raise ResourceNotReady("Missing information for source '%s' to start merging" % src_name)
if not src_doc.get("upload", {}).get("jobs", {}).get(src_name, {}).get("status") == "success":
raise ResourceNotReady("No successful upload found for resource '%s'" % src_name)
[docs]
def get_target_name(self):
return "{}_{}_{}".format(self.build_name, self.get_build_version(), get_random_string()).lower()
[docs]
def get_build_version(self):
"""
Generate an arbitrary major build version. Default is using a timestamp (YYMMDD)
'.' char isn't allowed in build version as it's reserved for minor versions
"""
d = datetime.fromtimestamp(self.t0)
version_fmt = self.build_config.get("build_version")
if not version_fmt:
version_fmt = "%Y%m%d"
bversion = d.strftime(version_fmt)
self.logger.info("Build version: %s", bversion)
return bversion
[docs]
def register_status(self, status, transient=False, init=False, **extra):
"""
Register current build status. A build status is a record in src_build
The key used in this dict the target_name. Then, any operation
acting on this target_name is registered in a "jobs" list.
"""
assert self.build_config, "build_config needs to be specified first"
# get it from source_backend, kind of weird...
src_build = self.source_backend.build
all_sources = self.build_config.get("sources", [])
target_name = "%s" % self.target_backend.target_name
backend_url = self.target_backend.get_backend_url()
build_info = {
"_id": target_name,
# TODO: deprecate target_backend & target_name, use backed_url instead
"target_backend": self.target_backend.name,
"target_name": target_name,
"backend_url": backend_url,
"build_config": self.build_config,
# these are all the sources required to build target
# (not just the ones being processed, those are registered in jobs
"sources": all_sources,
}
job_info = {
"status": status,
"step_started_at": datetime.now().astimezone(),
"logfile": self.logfile,
}
if transient:
# record some "in-progress" information
job_info["pid"] = os.getpid()
else:
# only register time when it's a final state
job_info["time"] = timesofar(self.ti)
t1 = round(time.time() - self.ti, 0)
job_info["time_in_s"] = t1
if "build" in extra:
build_info.update(extra["build"])
if "job" in extra:
job_info.update(extra["job"])
# create a new build entry in "build" dict if none exists
build = src_build.find_one({"_id": target_name})
if not build:
# first record for target_name, keep a timestamp
build_info["started_at"] = datetime.fromtimestamp(self.t0).astimezone()
build_info["jobs"] = []
src_build.insert_one(build_info)
if init:
# init timer for this step
self.ti = time.time()
src_build.update({"_id": target_name}, {"$push": {"jobs": job_info}})
# now refresh/sync
build = src_build.find_one({"_id": target_name})
else:
# merge extra at root level
# (to keep building data...) and update the last one
# (it's been properly created before when init=True)
build["jobs"] and build["jobs"][-1].update(job_info)
# build_info is common to all jobs, so we want to keep
# any existing data (well... except if it's explicitely specified)
def merge_build_info(target, d):
if "__REPLACE__" in d.keys():
d.pop("__REPLACE__")
target = d
else:
for k, v in d.items():
if type(v) == dict:
if k in target:
target[k] = merge_build_info(target[k], v)
else:
v.pop("__REPLACE__", None)
# merge v with "nothing" just to make sure to remove any "__REPLACE__"
v = merge_build_info({}, v)
target[k] = v
else:
target[k] = v
return target
build = merge_build_info(build, build_info)
src_build.replace_one({"_id": build["_id"]}, build)
[docs]
def clean_old_collections(self):
# use target_name is given, otherwise build name will be used
# as collection name prefix, so they should start like that
prefix = "%s_" % (self.target_name or self.build_name)
db = mongo.get_target_db()
cols = [c for c in db.collection_names() if c.startswith(prefix)]
# timestamp is what's after _archive_, YYYYMMDD, so we can sort it safely
cols = sorted(cols, reverse=True)
to_drop = cols[self.keep_archive :]
for colname in to_drop:
self.logger.info("Cleaning old archive collection '%s'", colname)
db[colname].drop()
[docs]
def init_mapper(self, mapper_name):
if self.mappers[mapper_name].need_load():
if mapper_name is None:
self.logger.info("Initializing default mapper")
else:
self.logger.info("Initializing mapper name '%s'", mapper_name)
self.mappers[mapper_name].load()
[docs]
def generate_document_query(self, src_name):
return None
[docs]
def get_root_document_sources(self):
root_srcs = self.build_config.get(self.doc_root_key, []) or []
# check for "not this resource" and adjust the list
none_root_srcs = [src.replace("!", "") for src in root_srcs if src.startswith("!")]
if none_root_srcs:
if len(none_root_srcs) != len(root_srcs):
raise BuilderException(
"If using '!' operator, all datasources must use it (cannot mix), got: %s", repr(root_srcs)
)
# ok, grab sources for this build,
srcs = self.build_config.get("sources", [])
root_srcs = list(set(srcs).difference(set(none_root_srcs)))
# self.logger.info("'except root' sources %s resolves to root source = %s", repr(none_root_srcs), root_srcs)
# resolve possible regex based source name (split-collections sources)
root_srcs = self.resolve_sources(root_srcs)
return root_srcs
[docs]
def setup(self, sources=None, target_name=None):
sources = sources or self.sources
# try to get target_name from args, otherwise for now generate it
# using mongo backend (it'll be set later during merge() call)
target_name = target_name or self.target_name
self.target_backend.set_target_name(self.target_name, self.build_name)
# root key is optional but if set, it must exist in build config
if self.doc_root_key and self.doc_root_key not in self.build_config:
raise BuilderException("Root document key '%s' can't be found in build configuration" % self.doc_root_key)
[docs]
def get_stats(self, sources, job_manager):
"""
Return a dictionnary of metadata for this build. It's usually app-specific
and this method may be overridden as needed. By default though, the total
number of documents in the merged collection is stored (key "total")
Return dictionary will be merged with any existing metadata in
src_build collection. This behavior can be changed by setting a special
key within metadata dict: {"__REPLACE__" : True} will... replace existing
metadata with the one returned here.
"job_manager" is passed in case parallelization is needed. Be aware
that this method is already running in a dedicated thread, in order to
use job_manager, the following code must be used at the very beginning
of its implementation:
asyncio.set_event_loop(job_manager.loop)
"""
total = self.target_backend.target_collection.count()
return {"total": total}
[docs]
def get_mapping(self, sources):
"""
Merge mappings from src_master
"""
mapping = {}
src_master = self.source_backend.master
for collection in self.build_config["sources"]:
meta = src_master.find_one({"_id": collection})
if "mapping" in meta and meta["mapping"]:
mapping = merge_struct(mapping, meta["mapping"])
else:
raise BuilderException('"%s" has no mapping data' % collection)
return mapping
[docs]
def resolve_sources(self, sources):
"""
Source can be a string that may contain regex chars. It's usefull
when you have plenty of sub-collections prefixed with a source name.
For instance, given a source named "blah" stored in as many collections
as chromosomes, insteand of passing each name as "blah_1", "blah_2", etc...
"blah_.*" can be specified in build_config. This method resolves potential
regexed source name into real, existing collection names
"""
if type(sources) == str:
sources = [sources]
src_db = mongo.get_src_db()
cols = src_db.collection_names()
masters = self.source_backend.get_src_master_docs()
found = []
for src in sources:
# check if master _id and name are different (meaning name is a regex)
master = masters.get(src)
if not master:
raise BuilderException(
"'%s'could not be found in master documents (%s)" % (src, repr(list(masters.keys())))
)
search = src
if master["_id"] != master["name"]:
search = master["name"]
# restrict pattern to minimal match
pat = re.compile("^%s$" % search)
for col in cols:
if pat.match(col):
found.append(col)
return found
[docs]
def merge(
self,
sources=None,
target_name=None,
force=False,
ids=None,
steps=("merge", "post", "metadata"),
job_manager=None,
*args,
**kwargs,
):
"""Merge given sources into a collection named target_name. If sources argument is omitted,
all sources defined for this merger will be merged together, according to what is defined
insrc_build_config. If target_name is not defined, a unique name will be generated.
Optional parameters:
- force=True will bypass any safety check
- ids: list of _ids to merge, specifically. If None, all documents are merged.
- steps:
* merge: actual merge step, create merged documents and store them
* post: once merge, run optional post-merge process
* metadata: generate and store metadata (depends on merger, usually specifies the amount
of merged data, source versions, etc...)
"""
assert job_manager
# check what to do
if isinstance(steps, tuple):
steps = list(steps) # may not be necessary, but previous steps default is a list, so let's be consistent
elif isinstance(steps, str):
steps = [steps]
self.t0 = time.time()
self.check_ready(force)
# normalize
avail_sources = self.build_config["sources"]
if sources is None:
self.target_backend.drop()
self.target_backend.prepare()
sources = avail_sources # merge all
elif isinstance(sources, str):
sources = [sources]
if ids is None:
# nothing passed specifically, let's have a look at the config
ids = self.build_config.get("ids")
if ids:
# config calls for a merge on specific _ids
if type(ids) == str:
# path to a file
m = map(lambda l: l.decode().strip(), open_compressed_file(ids).readlines())
ids = [_id for _id in m if not _id.startswith("#")]
orig_sources = sources
sources = self.resolve_sources(sources)
if not sources and "merge" in steps:
raise BuilderException(
"No source found, got %s while available sources are: %s" % (repr(orig_sources), repr(avail_sources))
)
if not target_name:
target_name = self.get_target_name()
self.target_name = target_name
self.target_backend.set_target_name(self.target_name)
self.setup_log() # Force logs will be stored in the target_name file
self.custom_metadata = {}
self.clean_old_collections()
self.logger.info("Merging into target collection '%s'", self.target_backend.target_name)
strargs = "[sources=%s,target_name=%s]" % (sources, target_name)
try:
async def do():
res = None
if "merge" in steps or "post" in steps:
job = self.merge_sources(
source_names=sources, ids=ids, steps=steps, job_manager=job_manager, *args, **kwargs
)
res = await job
if "metadata" in steps:
pinfo = self.get_pinfo()
pinfo["step"] = "metadata"
self.register_status("building", transient=True, init=True, job={"step": "metadata"})
postjob = await job_manager.defer_to_thread(
pinfo, partial(self.store_metadata, res, sources=sources, job_manager=job_manager)
)
def stored(f):
try:
nonlocal res
if res:
res = f.result() # consume to trigger exceptions if any
strargs = "[sources=%s,stats=%s]" % (sources, self.merge_stats)
build_version = self.get_build_version()
if "." in build_version:
raise BuilderException(
"Can't use '.' in build version '%s', it's reserved for minor versions"
% build_version
)
# get original start dt
src_build = self.source_backend.build
build = src_build.find_one({"_id": target_name})
_meta = {
"biothing_type": build["build_config"]["doc_type"],
"src": self.src_meta,
"stats": self.stats,
"build_version": build_version,
"build_date": datetime.fromtimestamp(self.t0).astimezone().isoformat(),
}
# custom
_meta.update(self.custom_metadata)
self.register_status(
"success",
build={
"merge_stats": self.merge_stats,
"mapping": self.mapping,
"_meta": _meta,
},
)
self.logger.info("success %s", strargs, extra={"notify": True})
# set next step
build_conf = AutoBuildConfig(build["build_config"])
if build_conf.should_diff_new_build():
pending(target_name, "diff")
if build_conf.should_snapshot_new_build():
pending(target_name, "snapshot")
except Exception as e:
strargs = "[sources=%s]" % sources
self.register_status("failed", job={"err": repr(e)})
self.logger.exception("failed %s: %s", strargs, e, extra={"notify": True})
raise
postjob.add_done_callback(stored)
await postjob
task = asyncio.ensure_future(do())
return task
except (KeyboardInterrupt, Exception) as e:
self.logger.exception(e)
self.register_status("failed", job={"err": repr(e)})
self.logger.exception("failed %s: %s", strargs, e, extra={"notify": True})
raise
[docs]
def get_mapper_for_source(self, src_name, init=True):
# src_name can be a regex (when source has split collections, they are merge but
# comes from the same "template" sourcek
docs = self.source_backend.get_src_master_docs()
mapper_name = None
for master_name in docs:
pat = re.compile("^%s$" % master_name)
if pat.match(src_name):
mapper_name = docs[master_name].get("mapper")
# TODO: this could be a list
try:
init and self.init_mapper(mapper_name)
mapper = self.mappers[mapper_name]
self.logger.info("Found mapper '%s' for source '%s'", mapper, src_name)
return mapper
except KeyError:
raise BuilderException("Found mapper named '%s' but no mapper associated" % mapper_name)
[docs]
def merge_order(self, other_sources):
"""Optionally we can override this method to customize the order in which sources should be merged.
Default as sorted by name.
"""
return sorted(other_sources)
[docs]
async def merge_sources(self, source_names, steps=("merge", "post"), batch_size=100000, ids=None, job_manager=None):
"""
Merge resources from given source_names or from build config.
Identify root document sources from the list to first process them.
ids can a be list of documents to be merged in particular.
"""
assert job_manager
# check what to do
if isinstance(steps, tuple):
steps = list(steps) # may not be necessary, but previous steps default is a list, so let's be consistent
elif isinstance(steps, str):
steps = [steps]
do_merge = "merge" in steps
do_post_merge = "post" in steps
self.merge_stats = {}
self.stats = {}
self.mapping = {}
# try to identify root document sources amongst the list to first
# process them (if any)
defined_root_sources = self.get_root_document_sources()
root_sources = list(set(source_names).intersection(set(defined_root_sources)))
other_sources = list(set(source_names).difference(set(root_sources)))
# got root doc sources but not part of the merge ? that's weird...
if defined_root_sources and not root_sources:
self.logger.warning("Root document sources found (%s) but not part of the merge...", defined_root_sources)
source_names = sorted(source_names)
root_sources = sorted(root_sources)
other_sources = self.merge_order(other_sources)
self.logger.info("Sources to be merged: %s", source_names)
self.logger.info("Root sources: %s", root_sources)
self.logger.info("Other sources: %s", other_sources)
got_error = False
async def merge(src_names):
jobs = []
# for i, src_name in enumerate(src_names):
for src_name in src_names:
await asyncio.sleep(0.0)
job = self.merge_source(src_name, batch_size=batch_size, ids=ids, job_manager=job_manager)
job = asyncio.ensure_future(job)
def merged(f, name, stats):
try:
res = f.result()
stats.update(res)
except Exception as e:
self.logger.exception("Failed merging source '%s': %s", name, e)
nonlocal got_error
got_error = e
job.add_done_callback(partial(merged, name=src_name, stats=self.merge_stats))
jobs.append(job)
await asyncio.wait([job])
# raise error as soon as we know something went wrong
if got_error:
raise got_error
tasks = asyncio.gather(*jobs)
await tasks
if do_merge:
if root_sources:
self.register_status(
"building", transient=True, init=True, job={"step": "merge-root", "sources": root_sources}
)
self.logger.info("Merging root document sources: %s", root_sources)
await merge(root_sources)
self.register_status("success", job={"step": "merge-root", "sources": root_sources})
if other_sources:
self.register_status(
"building", transient=True, init=True, job={"step": "merge-others", "sources": other_sources}
)
self.logger.info("Merging other resources: %s", other_sources)
await merge(other_sources)
self.register_status("success", job={"step": "merge-others", "sources": other_sources})
self.register_status("building", transient=True, init=True, job={"step": "finalizing"})
self.logger.info("Finalizing target backend")
self.target_backend.finalize()
self.register_status("success", job={"step": "finalizing"})
else:
self.logger.info("Skip data merging")
if do_post_merge:
self.logger.info("Running post-merge process")
self.register_status("building", transient=True, init=True, job={"step": "post-merge"})
pinfo = self.get_pinfo()
pinfo["step"] = "post-merge"
job = await job_manager.defer_to_thread(
pinfo, partial(self.post_merge, source_names, batch_size, job_manager)
)
job = asyncio.ensure_future(job)
def postmerged(f):
try:
self.logger.info("Post-merge completed [%s]", f.result())
self.register_status("success", job={"step": "post-merge"})
except Exception as e:
self.logger.exception("Failed post-merging source: %s", e)
nonlocal got_error
got_error = e
job.add_done_callback(postmerged)
await job
if got_error:
raise got_error
else:
self.logger.info("Skip post-merge process")
await asyncio.sleep(0.0)
return self.merge_stats
[docs]
def document_cleaner(self, src_name, *args, **kwargs):
"""
Return a function taking a document as argument, cleaning the doc
as needed, and returning that doc. If no function is needed, None.
Note: the returned function must be pickleable, careful with lambdas
and closures.
"""
return None
[docs]
async def merge_source(self, src_name, batch_size=100000, ids=None, job_manager=None):
# it's actually not optional
assert job_manager
_query = self.generate_document_query(src_name)
# Note: no need to check if there's an existing document with _id (we want to merge only with an existing document)
# if the document doesn't exist then the update() call will silently fail.
# That being said... if no root documents, then there won't be any previously inserted
# documents, and this update() would just do nothing. So if no root docs, then upsert
# (update or insert, but do something)
defined_root_sources = self.get_root_document_sources()
upsert = not defined_root_sources or src_name in defined_root_sources
if not upsert:
self.logger.debug(
"Documents from source '%s' will be stored only if a previous document exists with same _id", src_name
)
jobs = []
total = self.source_backend[src_name].count()
btotal = math.ceil(total / batch_size)
bnum = 1
cnt = 0
got_error = False
# grab ids only, so we can get more, let's say 10 times more
id_batch_size = batch_size * 10
# FIXME id_provider initialized below will be overwritten by `if _query and ids is None:` code block
if ids:
self.logger.info(
"Merging '%s' specific list of _ids, create merger job with batch_size=%d", src_name, batch_size
)
id_provider = [ids]
else:
self.logger.info(
"Fetch _ids from '%s' with batch_size=%d, and create merger job with batch_size=%d",
src_name,
id_batch_size,
batch_size,
)
id_provider = id_feeder(self.source_backend[src_name], batch_size=id_batch_size, logger=self.logger)
if _query and ids is not None:
self.logger.info("Query/filter involved, but also specific list of _ids. Ignoring query and use _ids")
if _query and ids is None:
self.logger.info("Query/filter involved, can't use cache to fetch _ids")
# use doc_feeder but post-process doc to keep only the _id
id_provider = map(
lambda docs: [d["_id"] for d in docs],
doc_feeder(
self.source_backend[src_name],
query=_query,
step=batch_size,
inbatch=True,
fields={"_id": 1},
logger=self.logger,
),
)
else:
# when passing a list of _ids, IDs will be sent to the query, so we need to reduce the batch size
id_provider = (
ids
and iter_n(ids, int(batch_size / 100))
or id_feeder(self.source_backend[src_name], batch_size=id_batch_size, logger=self.logger)
)
src_master = self.source_backend.master
meta = src_master.find_one({"_id": src_name}) or {}
merger = meta.get("merger", "upsert")
self.logger.info("Documents from source '%s' will be merged using %s", src_name, merger)
merger_kwargs = meta.get("merger_kwargs")
if merger_kwargs:
self.logger.info(
"Documents from source '%s' will be using these extra parameters during the merge %s",
src_name,
merger_kwargs,
)
doc_cleaner = self.document_cleaner(src_name)
for big_doc_ids in id_provider:
for doc_ids in iter_n(big_doc_ids, batch_size):
# try to put some async here to give control back
# (but everybody knows it's a blocking call: doc_feeder)
await asyncio.sleep(0.1)
cnt += len(doc_ids)
pinfo = self.get_pinfo()
pinfo["step"] = src_name
pinfo["description"] = "#%d/%d (%.1f%%)" % (bnum, btotal, (cnt / total * 100))
self.logger.info(
"Creating merger job #%d/%d, to process '%s' %d/%d (%.1f%%)",
bnum,
btotal,
src_name,
cnt,
total,
(cnt / total * 100.0),
)
job = await job_manager.defer_to_process(
pinfo,
partial(
merger_worker,
self.source_backend[src_name].name,
self.target_backend.target_name,
doc_ids,
self.get_mapper_for_source(src_name, init=False),
doc_cleaner,
upsert,
merger,
bnum,
merger_kwargs,
),
)
def batch_merged(f, batch_num):
nonlocal got_error
if type(f.result()) != int:
got_error = Exception(
"Batch #%s failed while merging source '%s' [%s]" % (batch_num, src_name, f.result())
)
job.add_done_callback(partial(batch_merged, batch_num=bnum))
jobs.append(job)
bnum += 1
# raise error as soon as we know
if got_error:
raise got_error
self.logger.info("%d jobs created for merging step", len(jobs))
tasks = asyncio.gather(*jobs)
def done(f):
nonlocal got_error
if None in f.result():
got_error = Exception("Some batches failed")
return
# compute overall inserted/updated records (consume result() and check summable)
_ = sum(f.result())
tasks.add_done_callback(done)
await tasks
if got_error:
raise got_error
else:
return {"%s" % src_name: cnt}
[docs]
def post_merge(self, source_names, batch_size, job_manager):
pass
[docs]
class LinkDataBuilder(DataBuilder):
"""
LinkDataBuilder creates a link to the original datasource to be merged, without
actually copying the data (merged collection remains empty). This builder is
only valid when using only one datasource (thus no real merge) is declared in
the list of sources to be merged, and is useful to prevent data duplication between
the datasource itself and the resulting merged collection.
"""
def __init__(self, build_name, source_backend, target_backend, *args, **kwargs):
super().__init__(build_name, source_backend, target_backend=partial(LinkTargetDocMongoBackend), *args, **kwargs)
conf = self.source_backend.get_build_configuration(self.build_name)
assert len(conf["sources"]) == 1, "Found more than one source to link, not allowed: %s" % conf["sources"]
assert hasattr(self.target_backend, "datasource_name")
self.target_backend.datasource_name = conf["sources"][0]
self.target_backend.source_db = self.source_backend
[docs]
async def merge_source(self, src_name, *args, **kwargs):
total = self.source_backend[src_name].count()
return {"%s" % src_name: total}
[docs]
def fix_batch_duplicates(docs, fail_if_struct_is_different=False):
"""
Remove duplicates from docs based on _id. If _id's the same but
structure is different (not real "duplicates", but different documents
with the same _ids), merge docs all together (dict.update)
or raise an error if fail_if_struct_is_different.
"""
dids = {}
# docs per _id
for d in docs:
dids.setdefault(d["_id"], []).append(d)
# now check doc structure for each duplicates
# if same structure, replace with one occurence of the docs
# if not the same, log all the docs as warning, and merge them all
# as we would do if we were upserting doc one-by-one (no batch)
# note: dict are unhashable (no set) so either compare one each other (n^2-ish)
# or use json strings (let's try json...)
for _id in dids:
jl = set([json.dumps(e, sort_keys=True) for e in dids[_id]])
if len(jl) > 1:
# different structure
if fail_if_struct_is_different:
raise ValueError("Found duplicated with different document structure: %s" % dids[_id])
else:
logging.warning(
"Found duplicated with different document structure, merging them altogether: %s" % dids[_id]
)
# merge docs on top of each other
dupdocs = dids[_id]
merged = {}
[merged.update(d) for d in dupdocs]
dids[_id] = merged
else:
assert len(jl) == 1
# normalize to scalar
dids[_id] = dids[_id][0]
return list(dids.values())
[docs]
def merger_worker(col_name, dest_name, ids, mapper, cleaner, upsert, merger, batch_num, merger_kwargs=None):
try:
src = mongo.get_src_db()
tgt = mongo.get_target_db()
col = src[col_name]
dest = DocMongoBackend(tgt, tgt[dest_name])
cur = doc_feeder(col, step=len(ids), inbatch=False, query={"_id": {"$in": ids}})
if cleaner:
cur = map(cleaner, cur)
mapper.load()
docs = [d for d in mapper.process(cur)]
# while documents from cursor "cur" are unique, at this point, due to the use
# a mapper, documents can be converted and there now can be duplicates (same _id)
# (ex: mygene, ensembl -> entrez conversion). "docs" could produce a duplicated error
# within the batch, so we need to remove duplicates.
all_ids = [d["_id"] for d in docs]
uniq_ids = set(all_ids)
if len(all_ids) != len(uniq_ids):
logging.warning("Found duplicated IDs within batch, trying to fix")
docs = fix_batch_duplicates(docs)
if merger == "merge_struct":
stored_docs = dest.mget_from_ids([d["_id"] for d in docs])
ddocs = dict([(d["_id"], d) for d in docs])
if merger_kwargs:
for d in stored_docs:
# Merge the old document in mongodb into the new document
ddocs[d["_id"]] = merge_struct(ddocs[d["_id"]], d, **merger_kwargs)
else:
for d in stored_docs:
ddocs[d["_id"]] = merge_struct(d, ddocs[d["_id"]])
docs = list(ddocs.values())
cnt = dest.update(docs, upsert=upsert)
return cnt
except Exception as e:
logger_name = "build_%s_%s_batch_%s" % (dest_name, col_name, batch_num)
logger, _ = get_logger(logger_name, btconfig.LOG_FOLDER)
logger.exception(e)
logger.error(
"col_name: %s, dest_name: %s, ids: see pickle, " % (col_name, dest_name)
+ "mapper: %s, cleaner: %s, upsert: %s, " % (mapper, cleaner, upsert)
+ "merger: %s, batch_num: %s" % (merger, batch_num)
)
exc_fn = os.path.join(btconfig.LOG_FOLDER, "%s.exc.pick" % logger_name)
pickle.dump(e, open(exc_fn, "wb"))
logger.info("Exception was dumped in pickle file '%s'", exc_fn)
ids_fn = os.path.join(btconfig.LOG_FOLDER, "%s.ids.pick" % logger_name)
pickle.dump(ids, open(ids_fn, "wb"))
logger.info("IDs dumped in pickle file '%s'", ids_fn)
dat_fn = os.path.join(btconfig.LOG_FOLDER, "%s.docs.pick" % logger_name)
pickle.dump(docs, open(dat_fn, "wb"))
logger.info("Data (batch of docs) dumped in pickle file '%s'", dat_fn)
raise
[docs]
def set_pending_to_build(conf_name=None):
src_build_config = get_src_build_config()
qfilter = {}
if conf_name:
qfilter = {"_id": conf_name}
logging.info(
"Setting pending_to_build flag for configuration(s): %s" % (conf_name and conf_name or "all configuraitons")
)
src_build_config.update(qfilter, {"$addToSet": {"pending": "build"}})
[docs]
class BuilderManager(BaseManager):
def __init__(
self,
source_backend_factory=None,
target_backend_factory=None,
builder_class=None,
poll_schedule=None,
*args,
**kwargs,
):
"""
BuilderManager deals with the different builders used to merge datasources.
It is connected to src_build() via sync(), where it grabs build information
and register builder classes, ready to be instantiate when triggering builds.
source_backend_factory can be a optional factory function (like a partial) that
builder can call without any argument to generate a SourceBackend.
Same for target_backend_factory for the TargetBackend. builder_class if given
will be used as the actual Builder class used for the merge and will be passed
same arguments as the base DataBuilder. It can also be a list of classes, in which
case the default used one is the first, when it's necessary to define multiple builders.
"""
super(BuilderManager, self).__init__(*args, **kwargs)
self.src_build_config = get_src_build_config()
self.source_backend_factory = source_backend_factory
self.target_backend_factory = target_backend_factory
builder_class = builder_class or DataBuilder
if isinstance(builder_class, list):
self.arg_builder_classes = builder_class
else:
self.arg_builder_classes = [builder_class]
self.default_builder_class = self.arg_builder_classes[0] or DataBuilder
self.builder_classes = {}
self.poll_schedule = poll_schedule
self.setup_log()
[docs]
def clean_stale_status(self):
src_build = get_src_build()
for build in src_build.find():
dirty = False
for job in build.get("jobs", []):
if job.get("status") == "building":
logging.warning("Found stale build '%s', marking build status as 'canceled'" % build["_id"])
job["status"] = "canceled"
dirty = True
if dirty:
src_build.replace_one({"_id": build["_id"]}, build)
@property
def source_backend(self):
source_backend = (
self.source_backend_factory
and self.source_backend_factory()
or partial(
SourceDocMongoBackend,
build_config=partial(get_src_build_config),
build=partial(get_src_build),
master=partial(get_src_master),
dump=partial(get_src_dump),
sources=partial(mongo.get_src_db),
)
)
return source_backend
@property
def target_backend(self):
target_backend = (
self.target_backend_factory
and self.target_backend_factory()
or partial(TargetDocMongoBackend, target_db=partial(mongo.get_target_db))
)
return target_backend
[docs]
def get_builder_class(self, build_config_name):
"""
builder class can be specified different way (in order):
1. within the build_config document (so, per configuration)
2. or defined in the builder manager (so, per manager)
3. or default to DataBuilder
"""
builder_class = None
conf = self.src_build_config.find_one({"_id": build_config_name})
if conf.get("builder_class"):
builder_class = self.builder_classes[conf["builder_class"]]["class"]
elif self.default_builder_class:
builder_class = self.default_builder_class
else:
builder_class = DataBuilder
return builder_class
[docs]
def register_builder(self, build_name):
# will use partial to postponse object creations and their db connection
# as we don't want to keep connection alive for undetermined amount of time
# declare source backend
def create(build_name):
# postpone config import so app had time to set it up
# before actual call time
from biothings import config
# assemble the whole
klass = self.get_builder_class(build_name)
self.logger.info("Build config '%s' will use builder class %s", build_name, klass)
bdr = klass(
build_name,
source_backend=self.source_backend,
target_backend=self.target_backend,
log_folder=config.LOG_FOLDER,
)
return bdr
self.register[build_name] = partial(create, build_name)
[docs]
def get_builder(self, col_name):
doc = get_src_build().find_one({"_id": col_name})
if not doc:
raise BuilderException("No such build named '%s'" % repr(col_name))
assert "build_config" in doc, "Expecting build_config information"
klass = self.get_builder_class(doc["build_config"]["name"])
bdr = klass(
doc["build_config"]["name"],
source_backend=self.source_backend,
target_backend=self.target_backend,
log_folder=btconfig.LOG_FOLDER,
)
# overwrite with existing values
bdr.build_config = doc["build_config"]
bdr.target_backend.set_target_name(col_name)
return bdr
[docs]
def delete_merged_data(self, merge_name):
target_db = mongo.get_target_db()
col = target_db[merge_name]
col.drop()
[docs]
def delete_merge(self, merge_name):
"""Delete merged collections and associated metadata"""
db = get_src_build()
meta = db.find_one({"_id": merge_name})
if meta:
db.remove({"_id": merge_name})
else:
self.logger.warning("No metadata found for merged collection '%s'", merge_name)
self.delete_merged_data(merge_name)
[docs]
def archive_merge(self, merge_name):
"""Delete merged collections and associated metadata"""
db = get_src_build()
meta = db.find_one({"_id": merge_name})
if meta:
meta["archived"] = datetime.now()
db.replace_one({"_id": merge_name}, meta)
else:
self.logger.warning("No metadata found for merged collection '%s'", merge_name)
self.delete_merged_data(merge_name)
[docs]
def get_query_for_list_merge(self, only_archived, status=None):
q = {"archived": {"$exists": 0}}
if only_archived:
q = {"archived": {"$exists": 1}}
if status:
if status == "success":
q["jobs.status"] = {"$not": {"$in": ["failed", "canceled"]}}
elif status in ["failed", "canceled"]:
q["jobs.status"] = status
return q
[docs]
def list_merge(self, build_config=None, only_archived=False):
q = self.get_query_for_list_merge(only_archived)
docs = get_src_build().find(q)
by_confs = {}
for d in docs:
by_confs.setdefault(d.get("build_config", {}).get("name", None), []).append(d["_id"])
if build_config:
return sorted(by_confs.get(build_config, []))
else:
for conf in by_confs:
by_confs[conf] = sorted(by_confs[conf])
return by_confs
[docs]
def setup_log(self):
self.logger, self.logfile = get_logger("buildmanager")
def __getitem__(self, build_name):
"""
Return an instance of a builder for the build named 'build_name'
Note: each call returns a different instance (factory call behind the scene...)
"""
# we'll get a partial class but will return an instance
pclass = BaseManager.__getitem__(self, build_name)
return pclass()
[docs]
def resolve_builder_class(self, klass):
"""
Resolve class/partial definition to (obj,"type","mod.class")
where names (class name, module, docstring, etc...) can
directly be accessed whether it's a standard class or not
"""
obj = klass
if type(klass) == partial:
assert type(klass.func) == type
btype = "partial"
obj = klass.func
elif type(klass) == type:
btype = "class"
else:
raise TypeError("Unknown type for builder %s" % repr(klass))
modstr = obj.__module__
classstr = obj.__name__
classpathstr = "%s.%s" % (modstr, classstr)
return (obj, btype, classpathstr)
[docs]
def find_builder_classes(self):
"""
Find all available build class:
1. classes passed during manager init (build_class)
(that includes the default builder)
2. all subclassing DataBuilder in:
a. biothings.hub.databuilder.*
b. hub.databuilder.* (app-specific)
"""
bclasses = set(self.arg_builder_classes)
mods = [sys.modules[__name__]]
try:
import hub.databuild as m
mods.append(m)
except ImportError:
pass
for klass in find_classes_subclassing(mods, DataBuilder):
bclasses.add(klass)
for klass in bclasses:
try:
obj, btype, classpathstr = self.resolve_builder_class(klass)
helpstr = obj.__doc__ and " ".join(map(str.strip, obj.__doc__.splitlines()))
self.builder_classes[classpathstr] = {
"desc": helpstr,
"type": btype,
"class": klass,
"default": klass == self.default_builder_class,
}
except Exception as e:
logging.exception("Can't extract information from builder class %s: %s" % (repr(klass), e))
[docs]
def merge(self, build_name, sources=None, target_name=None, **kwargs):
"""
Trigger a merge for build named 'build_name'. Optional list of sources can be
passed (one single or a list). target_name is the target collection name used
to store to merge data. If none, each call will generate a unique target_name.
"""
try:
bdr = self[build_name]
job = bdr.merge(sources, target_name, job_manager=self.job_manager, **kwargs)
return job
except KeyError:
raise BuilderException("No such builder for '%s'" % build_name)
except ResourceNotReady as e:
raise BuilderException(f"Some datasources aren't ready for the merge: {e}")
[docs]
def list_sources(self, build_name):
"""
List all registered sources used to trigger a build named 'build_name'
"""
info = self.src_build_config.find_one({"_id": build_name})
return info and info["sources"] or []
[docs]
def whatsnew(self, build_name=None, old=None):
"""
Return datasources which have changed since last time
(last time is datasource information from metadata, either from
given old src_build doc name, or the latest found if old=None)
"""
dbbuild = get_src_build()
dbdump = get_src_dump()
def whatsnewcomparedto(build_name, old=None):
if old is None:
# TODO: this will get big... but needs to be generic
# because we handle different hub db backends (or it needs to be a
# specific helper func to be defined all backends
# FIXME: this gets slower as hub gets more builds, we are
# finding all builds of all build configs when /whatsnew gets
# requested
builds = dbbuild.find({"build_config.name": build_name})
builds = sorted(builds, key=lambda e: e["started_at"], reverse=True)
if not builds:
raise BuilderException(f"Can't find a build associated to config '{build_name}'")
# Pickup latest success build
old = None
for _build in builds:
if _build.get("_meta", {}).get("src"):
old = _build
break
if not old:
raise BuilderException(f"Can't find a success build associated to config '{build_name}'")
else:
old = dbbuild.find_one({"_id": old})
meta_srcs = old.get("_meta", {}).get("src", {})
new = {
"old_build": {
"name": old["_id"],
"built_at": old["started_at"],
},
"sources": {},
}
for src_name, data in meta_srcs.items():
try:
srcd = dbdump.find_one({"_id": src_name})
try:
if srcd and srcd.get("download"):
if not srcd["download"]["status"] == "success":
srcd = None
if srcd and srcd.get("upload"):
for sub in srcd["upload"]["jobs"].values():
if not sub["status"] == "success":
srcd = None
break
except KeyError as e:
self.logger.warning("whatsnew: src_dump:%s missing keys: %s, not touching", src_name, e)
if srcd and not srcd.get("download") and srcd.get("upload"):
# this is a collection only source, find all releases in sub sources, hopefully all are the same
rels = [sub["release"] for sub in srcd["upload"]["jobs"].values()]
srels = set(rels)
if len(srels) != 1:
raise ValueError(
"Found different releases in sub-sources, expected only one common: %s" % repr(rels)
)
rel = rels[0]
if data.get("version") and rel != data["version"]:
new["sources"][src_name] = {"old": {"version": data["version"]}, "new": {"version": rel}}
elif (
srcd
and srcd.get("download", {}).get("release")
and srcd["download"]["release"] != data["version"]
):
new["sources"][src_name] = {
"old": {"version": data["version"]},
"new": {
"version": srcd["download"]["release"],
"downloaded_at": srcd["download"].get("started_at"),
},
}
except Exception as e:
self.logger.warning("Can't check what's new for source '%s': %s", src_name, e)
return {build_name: new}
if old is None and build_name is None:
# do this for all build configs
dbbuildconfig = get_src_build_config()
configs = {}
for d in dbbuildconfig.find(
{
"$or": [
{"archived": {"$exists": 0}},
{"archived": False},
]
}
):
try:
news = whatsnewcomparedto(d["_id"])
if news[d["_id"]]["sources"]:
configs.update(news)
except BuilderException:
continue
return configs
else:
return whatsnewcomparedto(build_name, old)
[docs]
def clean_temp_collections(self, build_name, date=None, prefix=""):
"""
Delete all target collections created from builder named
"build_name" at given date (or any date is none given -- carefull...).
Date is a string (YYYYMMDD or regex)
Common collection name prefix can also be specified if needed.
"""
target_db = mongo.get_target_db()
for col_name in target_db.collection_names():
search = prefix and prefix + "_" or ""
search += build_name + "_"
search += date and date + "_" or ""
pat = re.compile(search)
if pat.match(col_name) and "current" not in col_name:
logging.info("Dropping target collection '%s" % col_name)
target_db[col_name].drop()
[docs]
def poll(self):
"""
Check "whatsnew()" to idenfity builds which could be automatically built,
if {"autobuild" : {...}} is part of the build configuration. "autobuild" contains
a dict with "schedule" (aiocron/crontab format), so each build configuration can
have a different polling schedule.
"""
# don't use $exists in find(), not all hub backend implements that
logger, _ = get_logger("autobuild")
schedules = {
conf["_id"]: conf["autobuild"]["schedule"]
for conf in get_src_build_config().find()
if conf.get("autobuild", {}).get("schedule")
}
async def _autobuild(conf_name):
new = self.whatsnew(conf_name)
logger.info(f"{conf_name}:{schedules[conf_name]}")
logger.info(f"{conf_name}:{new}")
if new[conf_name]["sources"]:
self.merge(conf_name)
logger.info(f"{conf_name}:merge(*)")
else:
logger.info(f"{conf_name}:pass")
logger.info(datetime.now().astimezone())
logger.info(schedules) # all schedules
for _id, _schedule in schedules.items():
try:
aiocron.crontab(_schedule, func=partial(_autobuild, _id), start=True, loop=self.job_manager.loop)
except Exception:
logger.exception((_id, _schedule))
[docs]
def trigger_merge(self, doc):
return self.merge(doc["_id"])
[docs]
def build_config_info(self):
configs = {}
err = None
for name in self.register:
try:
builder = self[name]
except Exception as e:
conf = get_src_build_config().find_one({"_id": name})
if conf:
builder = dotdict({"build_config": conf})
else:
builder = None
err = str(e)
if (
not builder
or issubclass(builder.target_backend.__class__, LinkTargetDocMongoBackend)
or issubclass(builder.__class__, dict)
): # fake builder obj
target_db = None # it's not a traditional target database, it's pointing to
# somewhere else (TODO: maybe LinkTargetDocMongoBackend should
# implement more methods to return info about that
else:
target_db = builder.target_backend.target_collection.database.client.address
configs[name] = {
"build_config": builder and builder.build_config,
"archived": "archived" in (builder and builder.build_config or []),
}
if builder and builder.source_backend:
configs[name]["source_backend"] = {
"type": builder and builder.source_backend.__class__.__name__,
"source_db": builder and builder.source_backend.sources.client.address,
}
if builder and builder.target_backend:
configs[name]["target_backend"] = {
"type": builder and builder.target_backend.__class__.__name__,
"target_db": target_db,
}
if err:
configs[name]["error"] = err
if builder and builder.mappers:
configs[name]["mapper"] = {}
for mappername, mapper in builder.mappers.items():
configs[name]["mapper"][mappername] = mapper.__class__.__name__
res = {"build_configs": configs}
# dict contains an actual class, non-serializable, so adjust:
bclasses = copy.deepcopy(self.builder_classes)
for _k, v in bclasses.items():
v.pop("class")
res["builder_classes"] = bclasses
return res
[docs]
def build_info(
self,
id=None,
conf_name=None,
fields=None,
only_archived=False,
status=None,
):
"""
Return build information given an build _id, or all builds
if _id is None. "fields" can be passed to select which fields
to return or not (mongo notation for projections), if None
return everything except:
- "mapping" (too long)
If id is None, more are filtered:
- "sources" and some of "build_config"
only_archived=True will return archived merges only
status: will return only successful/failed builds. Can be "success" or "failed"
"""
res = {}
q = self.get_query_for_list_merge(only_archived=only_archived, status=status)
if id is not None:
q = {"_id": id}
else:
fields = {}
fields["mapping"] = 0
fields["sources"] = 0
fields["build_config.sources"] = 0
fields["build_config.root"] = 0
if conf_name is not None:
q["build_config._id"] = conf_name
builds = [b for b in get_src_build().find(q, fields)]
res = [b for b in sorted(builds, key=lambda e: str(e.get("started_at") or ""), reverse=True)]
# set a global status (ie. latest job's status)
# + get total #docs
db = mongo.get_target_db()
for b in res:
jobs = b.get("jobs", [])
b["status"] = "unknown"
if jobs:
b["status"] = jobs[-1]["status"]
try:
backend = create_backend(b["backend_url"])
b["count"] = backend.count()
except KeyError:
b["count"] = db[b["_id"]].count()
if id:
if res:
return res.pop()
else:
raise ValueError("No such build named '%s'" % id)
else:
return res
[docs]
def upsert_build_conf(self, name, doc_type, sources, roots, builder_class, params, archived):
col = get_src_build_config()
builder_class = builder_class or self.resolve_builder_class(self.default_builder_class)[2] # class path string
doc = {
"_id": name,
"name": name,
"doc_type": doc_type,
"sources": sources,
"root": roots,
"builder_class": builder_class,
}
if archived:
doc["archived"] = True
else:
doc.pop("archived", None)
doc.update(params)
col.save(doc)
self.configure()
[docs]
def create_build_configuration(
self,
name,
doc_type,
sources,
roots=None,
builder_class=None,
params=None,
archived=False,
):
roots = roots or []
params = params or {}
col = get_src_build_config()
# check conf doesn't exist yet
if [d for d in col.find({"_id": name})]:
raise ValueError("Configuration named '%s' already exists" % name)
self.upsert_build_conf(name, doc_type, sources, roots, builder_class, params, archived)
[docs]
def update_build_configuration(
self,
name,
doc_type,
sources,
roots=None,
builder_class=None,
params=None,
archived=False,
):
roots = roots or []
params = params or {}
self.upsert_build_conf(name, doc_type, sources, roots, builder_class, params, archived)
[docs]
def delete_build_configuration(self, name):
col = get_src_build_config()
col.remove({"_id": name})
self.configure()
[docs]
def save_mapping(self, name, mapping=None, dest="build", mode="mapping"):
logging.debug("Saving mapping for build '%s' destination='%s':\n%s", name, dest, pformat(mapping))
src_build = get_src_build()
m = src_build.find_one({"_id": name})
assert m, "Can't find build document for '%s'" % name
# either given a fully qualified source or just sub-source
if dest == "build":
m["mapping"] = mapping
src_build.save(m)
elif dest == "inspect":
try:
m["inspect"]["results"][mode] = mapping
src_build.save(m)
except KeyError:
raise ValueError("Can't save mapping, document doesn't contain expected inspection data")
else:
raise ValueError("Unknow saving destination: %s" % repr(dest))