Source code for biothings.hub.databuild.backend
"""
Backend for storing merged genedoc after building.
Support MongoDB, ES, CouchDB
"""
import os
from functools import partial
import biothings.utils.mongo as mongo
from biothings.utils.backend import DocBackendBase, DocESBackend, DocMongoBackend
from biothings.utils.common import get_random_string, get_timestamp
from biothings.utils.es import ESIndexer
from biothings.utils.hub_db import get_source_fullname, get_src_build
# Source specific backend (deals with build config, master docs, etc...)
[docs]
class SourceDocBackendBase(DocBackendBase):
def __init__(self, build_config, build, master, dump, sources):
if type(build_config) == partial:
self._build_config_provider = build_config
self._build_config = None
else:
self._build_config_provider = None
self._build_config = build_config
if type(build) == partial:
self._build_provider = build
self._build = None
else:
self._build_provider = None
self._build = build
if type(master) == partial:
self._master_provider = master
self._master = None
else:
self._master_provider = None
self._master = master
if type(dump) == partial:
self._dump_provider = dump
self._dump = None
else:
self._dump_provider = None
self._dump = dump
if type(sources) == partial:
self._sources_provider = sources
self._sources = None
else:
self._sources_provider = None
self._sources = sources
self._build_config = None
self.src_masterdocs = None
# keep track of sources which were accessed
self.sources_accessed = {}
def __getattr__(self, attr):
if attr not in ["build_config", "build", "dump", "master", "sources"]:
return AttributeError(attr)
privattr = "_" + attr
if getattr(self, privattr) is None:
if getattr(self, privattr + "_provider"):
res = getattr(self, privattr + "_provider")()
else:
res = getattr(self, privattr)
setattr(self, privattr, res)
return getattr(self, privattr)
[docs]
def get_build_configuration(self, build_name):
raise NotImplementedError("sub-class and implement me")
[docs]
def validate_sources(self, sources=None):
raise NotImplementedError("sub-class and implement me")
def __getitem__(self, src_name):
self.sources_accessed[src_name] = 1
return self.sources[src_name]
# Target specific backend
[docs]
class TargetDocBackend(DocBackendBase):
def __init__(self, *args, **kwargs):
super(TargetDocBackend, self).__init__(*args, **kwargs)
self._target_name = None
[docs]
def set_target_name(self, target_name, build_name=None):
"""
Create/prepare a target backend, either strictly named "target_name"
or named derived from "build_name" (for temporary backends)
"""
self._target_name = target_name or self.generate_target_name(build_name)
@property
def target_name(self):
return self._target_name
[docs]
def generate_target_name(self, build_config_name):
assert build_config_name is not None
return "{}_{}_{}".format(build_config_name, get_timestamp(), get_random_string()).lower()
[docs]
def get_backend_url(self):
"""
Return backend URL (see create_backend() for formats)
"""
# default is a collection in target database
return self._target_name
[docs]
class SourceDocMongoBackend(SourceDocBackendBase):
[docs]
def get_build_configuration(self, build_name):
self._config = self.build_config.find_one({"_id": build_name})
return self._config
[docs]
def validate_sources(self, sources=None):
assert self._build_config, "'self._build_config' cannot be empty."
[docs]
def get_src_master_docs(self):
if self.src_masterdocs is None:
self.src_masterdocs = dict([(src["_id"], src) for src in list(self.master.find())])
return self.src_masterdocs
[docs]
def get_src_metadata(self):
"""
Return source versions which have been previously accessed wit this backend object
or all source versions if none were accessed. Accessing means going through __getitem__
(the usual way) and allows to auto-keep track of sources of interest, thus returning
versions only for those.
"""
src_version = {}
# what's registered in each uploader, from src_master.
# also includes versions
src_meta = {}
srcs = []
if self.sources_accessed:
for src in self.sources_accessed:
fullname = get_source_fullname(src)
main_name = fullname.split(".")[0]
doc = self.dump.find_one({"_id": main_name})
srcs.append(doc["_id"])
srcs = list(set(srcs))
else:
srcs = [d["_id"] for d in self.dump.find()]
# we need to return main_source named, but if accessed, it's been through sub-source names
# query is different in that case
for src in self.dump.find({"_id": {"$in": srcs}}):
# now merge other extra information from src_master (src_meta key). src_master _id
# are sub-source names, not main source so we have to deal with src_dump as well
# in order to resolve/map main/sub source name
subsrc_versions = []
if src and src.get("download"):
# Store the latest success dump time
src_meta.setdefault(src["_id"], {})
last_success = src["download"].get("last_success")
if not last_success and src["download"].get("status") == "success":
last_success = src["download"].get("started_at")
if last_success:
src_meta[src["_id"]]["download_date"] = last_success
if src and src.get("upload"):
latest_upload_date = None
meta = {}
for job_name in src["upload"].get("jobs", {}):
job = src["upload"]["jobs"][job_name]
# "step" is the actual sub-source name
sub_source = job.get("step")
docm = self.master.find_one({"_id": sub_source})
if docm and docm.get("src_meta"):
meta[sub_source] = docm["src_meta"]
# Store the latest success upload time
if not latest_upload_date or latest_upload_date < job["started_at"]:
step_meta = meta.setdefault(sub_source, {})
sub_source_info = src["upload"]["jobs"][sub_source]
last_success = sub_source_info.get("last_success")
if not last_success and sub_source_info.get("status") == "success":
last_success = sub_source_info.get("started_at")
if last_success:
step_meta["upload_date"] = last_success
# when more than 1 sub-sources, we can have different version in sub-sources
# (not normal) if one sub-source uploaded, then dumper downloaded a new version,
# then the other sub-source uploaded that version. This should never happen, just make sure
subsrc_versions = [
{"sub-source": job.get("step"), "version": job.get("release")}
for job in src["upload"].get("jobs", {}).values()
]
assert (
len(set([s["version"] for s in subsrc_versions])) == 1
), "Expecting one version " + "in upload sub-sources for main source '%s' but got: %s" % (
src["_id"],
subsrc_versions,
)
# usually, url & license are the same wathever the sub-sources are. They are
# share common metadata, and we don't want them to be repeated for each sub-sources.
# but, code key is always different for instance and must specific for each sub-sources
# here we make sure to factor common keys, while the specific ones at sub-level
if len(meta) > 1:
common = {}
any = list(meta)[0]
topop = [] # common keys
for anyk in meta[any]:
if len({meta[s].get(anyk) == meta[any][anyk] for s in meta}) == 1:
topop.append(anyk)
for k in topop:
common[k] = meta[any][k]
[meta[subname].pop(k, None) for subname in meta]
for k, v in common.items():
src_meta.setdefault(src["_id"], {}).setdefault(k, v)
for subname in meta:
for k, v in meta[subname].items():
src_meta.setdefault(src["_id"], {}).setdefault(k, {}).setdefault(subname, v)
# we have metadata, but just one (ie. no sub-source), don't display it
elif meta:
assert len(meta) == 1
subname, metad = meta.popitem()
for k, v in metad.items():
src_meta.setdefault(src["_id"], {}).setdefault(k, v)
if subsrc_versions:
version = subsrc_versions[0]["version"]
src_version[src["_id"]] = version
src_meta.setdefault(src["_id"], {}).setdefault("version", version)
return src_meta
[docs]
class TargetDocMongoBackend(TargetDocBackend, DocMongoBackend):
[docs]
def set_target_name(self, target_name=None, build_name=None):
super(TargetDocMongoBackend, self).set_target_name(target_name, build_name)
self.target_collection = self.target_db[self._target_name]
[docs]
class ShardedTargetDocMongoBackend(TargetDocMongoBackend):
[docs]
def prepare(self):
assert self.target_name, "target_name not set"
assert self.target_db, "target_db not set"
dba = self.target_db.client.admin
dba.command("shardCollection", "%s.%s" % (self.target_db.name, self.target_name), key={"_id": "hashed"})
[docs]
class LinkTargetDocMongoBackend(TargetDocBackend):
"""
This backend type act as a dummy target backend, the data
is actually stored in source database. It means only one
datasource can be linked to that target backend, as a consequence,
when this backend is used in a merge, there's no actual data
merge. This is useful when "merging/indexing" only one datasource,
where the merge step is just a duplication of datasource data.
"""
name = "link"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# will be set later by LinkDataBuilder, should be the name
# of the datasource in src database
self.datasource_name = None
self.source_db = None
@property
def target_collection(self):
assert self.source_db
assert self.datasource_name
return self.source_db[self.datasource_name]
[docs]
def create_backend(db_col_names, name_only=False, follow_ref=False, **kwargs):
"""
Guess what's inside 'db_col_names' and return the corresponding backend.
- It could be a string (will first check for an src_build doc to check
a backend_url field, if nothing there, will lookup a mongo collection
in target database)
- or a tuple("target|src","col_name")
- or a ("mongodb://user:pass@host","db","col_name") URI.
- or a ("es_host:port","index_name","doc_type")
If name_only is true, just return the name uniquely identifying the collection or index
URI connection.
"""
col = None
db = None
is_mongo = True
if type(db_col_names) == str:
# first check build doc, if there's backend_url key, we'll use it instead of
# direclty using db_col_names as target collection (see LinkDataBuilder)
bdoc = get_src_build().find_one({"_id": db_col_names})
if follow_ref and bdoc and bdoc.get("backend_url") and bdoc["backend_url"] != db_col_names:
return create_backend(bdoc["backend_url"], name_only=name_only, follow_ref=follow_ref, **kwargs)
else:
db = mongo.get_target_db()
col = db[db_col_names]
# normalize params
db_col_names = ["%s:%s" % (db.client.HOST, db.client.PORT), db.name, col.name]
elif db_col_names[0].startswith("mongodb://"):
assert len(db_col_names) == 3, "Missing connection information for %s" % repr(db_col_names)
conn = mongo.MongoClient(db_col_names[0])
db = conn[db_col_names[1]]
col = db[db_col_names[2]]
# normalize params
db_col_names = ["%s:%s" % (db.client.HOST, db.client.PORT), db.name, col.name]
elif len(db_col_names) == 3 and ":" in db_col_names[0]:
is_mongo = False
idxr = ESIndexer(
index=db_col_names[1],
doc_type=db_col_names[2],
es_host=db_col_names[0],
**kwargs,
)
db = idxr
col = db_col_names[1]
else:
assert len(db_col_names) == 2, "Missing connection information for %s" % repr(db_col_names)
db = db_col_names[0] == "target" and mongo.get_target_db() or mongo.get_src_db()
col = db[db_col_names[1]]
# normalize params (0:host, 1:port)
db_col_names = ["%s:%s" % (db.client.HOST, db.client.PORT), db.name, col.name]
assert col is not None, "Could not create collection object from %s" % repr(db_col_names)
if name_only:
if is_mongo:
return "mongo_%s_%s_%s" % (db_col_names[0].replace(":", "_"), db_col_names[1], db_col_names[2])
else:
return "es_%s_%s_%s" % (db_col_names[0].replace(":", "_"), db_col_names[1], db_col_names[2])
else:
if is_mongo:
return DocMongoBackend(db, col)
else:
return DocESBackend(db)
[docs]
def generate_folder(root_folder, old_db_col_names, new_db_col_names):
new = create_backend(new_db_col_names, name_only=True)
old = create_backend(old_db_col_names, name_only=True)
diff_folder = os.path.join(root_folder, "%s-%s" % (old, new))
return diff_folder
[docs]
def merge_src_build_metadata(build_docs):
"""
Merge metadata from src_build documents. A list of docs
should be passed, the order is important: the 1st element
has the less precedence, the last the most. It means that,
when needed, some values from documents on the "left" of the
list may be overridden by one on the right.
Ex: build_version field
Ideally, build docs shouldn't have any sources in common to
prevent any unexpected conflicts...
"""
assert (
type(build_docs) == list and len(build_docs) >= 2
), "More than one build document must be passed in order metadata"
first = build_docs[0]
others = build_docs[1:]
meta = first.setdefault("_meta", {})
for new_doc in others:
new_meta = new_doc.get("_meta", {})
for k, v in new_meta.items():
# src_version, src, stats: merge
if type(v) == dict:
meta.setdefault(k, {})
meta[k].update(v)
# build_version, build_date: override
else:
meta[k] = v
# TODO: some fields in stats don't make when merged: total, observed/vcf in mv
return meta