import asyncio
import copy
import datetime
import inspect
import os
import time
from functools import partial
from biothings import config
from biothings.hub import BUILDER_CATEGORY, DUMPER_CATEGORY, UPLOADER_CATEGORY
from biothings.utils.common import get_random_string, get_timestamp, timesofar
from biothings.utils.hub_db import get_src_conn, get_src_dump, get_src_master
from biothings.utils.loggers import get_logger
from biothings.utils.manager import BaseSourceManager, ResourceNotFound
from biothings.utils.storage import (
BasicStorage,
IgnoreDuplicatedStorage,
MergerStorage,
NoBatchIgnoreDuplicatedStorage,
NoStorage,
)
from biothings.utils.version import get_source_code_info
from biothings.utils.workers import upload_worker
logging = config.logger
[docs]
class ResourceNotReady(Exception):
pass
[docs]
class ResourceError(Exception):
pass
[docs]
class BaseSourceUploader(object):
"""
Default datasource uploader. Database storage can be done
in batch or line by line. Duplicated records aren't not allowed
"""
# TODO: fix this delayed import
from biothings import config
__database__ = config.DATA_SRC_DATABASE
# define storage strategy, override in subclass as necessary
storage_class = BasicStorage
# Will be override in subclasses
# name of the resource and collection name used to store data
# (see regex_name though for exceptions)
name = None
# if several resources, this one if the main name,
# it's also the _id of the resource in src_dump collection
# if set to None, it will be set to the value of variable "name"
main_source = None
# in case resource used split collections (so data is spread accross
# different colleciton, regex_name should be specified so all those split
# collections can be found using it (used when selecting mappers for instance)
regex_name = None
keep_archive = 10 # number of archived collection to keep. Oldest get dropped first.
def __init__(self, db_conn_info, collection_name=None, log_folder=None, *args, **kwargs):
"""db_conn_info is a database connection info tuple (host,port) to fetch/store
information about the datasource's state."""
# non-pickable attributes (see __getattr__, prepare() and unprepare())
self.init_state()
self.db_conn_info = db_conn_info
self.timestamp = datetime.datetime.now()
self.t0 = time.time()
# main_source at object level so it's part of pickling data
# otherwise it won't be set properly when using multiprocessing
# note: "name" is always defined at class level so pickle knows
# how to restore it
self.main_source = self.__class__.main_source or self.__class__.name
self.log_folder = log_folder or config.LOG_FOLDER
self.logfile = None
self.temp_collection_name = None
self.collection_name = collection_name or self.name
self.data_folder = None
self.prepared = False
self.src_doc = {} # will hold src_dump's doc
@property
def fullname(self):
if self.main_source != self.name:
name = "%s.%s" % (self.main_source, self.name)
else:
name = self.name
return name
[docs]
@classmethod
def create(klass, db_conn_info, *args, **kwargs):
"""
Factory-like method, just return an instance of this uploader
(used by SourceManager, may be overridden in sub-class to generate
more than one instance per class, like a true factory.
This is usefull when a resource is splitted in different collection but the
data structure doesn't change (it's really just data splitted accros
multiple collections, usually for parallelization purposes).
Instead of having actual class for each split collection, factory
will generate them on-the-fly.
"""
return klass(db_conn_info, *args, **kwargs)
[docs]
def init_state(self):
self._state = {
"db": None,
"conn": None,
"collection": None,
"src_dump": None,
"logger": None,
}
[docs]
def prepare(self, state={}): # noqa: B006
"""Sync uploader information with database (or given state dict)"""
if self.prepared:
return
if state:
# let's be explicit, _state takes what it wants
for k in self._state:
self._state[k] = state[k]
return
self._state["conn"] = get_src_conn()
self._state["db"] = self._state["conn"][self.__class__.__database__]
self._state["collection"] = self._state["db"][self.collection_name]
self._state["src_dump"] = self.prepare_src_dump()
self._state["src_master"] = get_src_master()
self._state["logger"], self.logfile = self.setup_log()
self.data_folder = self.src_doc.get("download", {}).get("data_folder") or self.src_doc.get("data_folder")
# flag ready
self.prepared = True
[docs]
def unprepare(self):
"""
reset anything that's not pickable (so self can be pickled)
return what's been reset as a dict, so self can be restored
once pickled
"""
state = {
"db": self._state["db"],
"conn": self._state["conn"],
"collection": self._state["collection"],
"src_dump": self._state["src_dump"],
"src_master": self._state["src_master"],
"logger": self._state["logger"],
}
for k in state:
self._state[k] = None
self.prepared = False
return state
[docs]
def get_predicates(self):
"""
Return a list of predicates (functions returning true/false, as in math logic)
which instructs/dictates if job manager should start a job (process/thread)
"""
def no_dumper_running(job_manager):
"""
Dumpers could change the files uploader is currently using
"""
return (
len(
[
j
for j in job_manager.jobs.values()
if j["source"] == self.fullname.split(".")[0] and j["category"] == DUMPER_CATEGORY
]
)
== 0
)
def no_builder_running(job_manager):
"""
Builders (mergers) read data from single datasource under control of uploader
don't change the data while it's being used
"""
return len([j for j in job_manager.jobs.values() if j["category"] == BUILDER_CATEGORY]) == 0
# TODO: can't use this one below for parallized uploader
# def no_same_uploader_running(job_manager):
# """
# Avoid collision at mongo's level (and what's the point anyway?)
# """
# return len([j for j in job_manager.jobs.values() if \
# j["source"] == self.fullname and j["category"] == UPLOADER_CATEGORY]) == 0
return [no_dumper_running, no_builder_running]
[docs]
def get_pinfo(self):
"""
Return dict containing information about the current process
(used to report in the hub)
"""
pinfo = {
"category": UPLOADER_CATEGORY,
"source": self.fullname,
"step": "",
"description": "",
}
preds = self.get_predicates()
if preds:
pinfo["__predicates__"] = preds
return pinfo
[docs]
def check_ready(self, force=False):
if not self.src_doc:
raise ResourceNotReady(f"Missing information for source '{self.main_source}' to start upload")
if not self.src_doc.get("download", {}).get("data_folder"):
raise ResourceNotReady("No data folder found for resource '%s'" % self.name)
if not force and not self.src_doc.get("download", {}).get("status") == "success":
raise ResourceNotReady("No successful download found for resource '%s'" % self.name)
if not os.path.exists(self.data_folder):
raise ResourceNotReady(f"Data folder '{self.data_folder}' doesn't exist for resource '{self.name}'")
job = self.src_doc.get("upload", {}).get("job", {}).get(self.name)
if not force and job:
raise ResourceNotReady(f"Resource '{self.name}' is already being uploaded (job: {job})")
[docs]
def load_data(self, data_path):
"""
Parse data from data_path and return structure ready to be inserted in database
In general, data_path is a folder path.
But in parallel mode (use parallelizer option), data_path is a file path
:param data_path: It can be a folder path or a file path
:return: structure ready to be inserted in database
"""
raise NotImplementedError("Implement in subclass")
[docs]
@classmethod
def get_mapping(self):
"""Return ES mapping"""
return {} # default to nothing...
[docs]
def make_temp_collection(self):
"""Create a temp collection for dataloading, e.g., entrez_geneinfo_INEMO."""
if self.temp_collection_name:
# already set
return
self.temp_collection_name = self.collection_name + "_temp_" + get_random_string()
return self.temp_collection_name
[docs]
def clean_archived_collections(self):
# archived collections look like...
prefix = "%s_archive_" % self.name
cols = [c for c in self.db.collection_names() if c.startswith(prefix)]
tmp_prefix = "%s_temp_" % self.name
tmp_cols = [c for c in self.db.collection_names() if c.startswith(tmp_prefix)]
# timestamp is what's after _archive_, YYYYMMDD, so we can sort it safely
cols = sorted(cols, reverse=True)
to_drop = cols[self.keep_archive :] + tmp_cols # noqa: E203
for colname in to_drop:
self.logger.info("Cleaning old archive/temp collection '%s'" % colname)
self.db[colname].drop()
[docs]
def switch_collection(self):
"""after a successful loading, rename temp_collection to regular collection name,
and renaming existing collection to a temp name for archiving purpose.
"""
if self.temp_collection_name and self.db[self.temp_collection_name].count() > 0:
if self.collection_name in self.db.collection_names():
# renaming existing collections
new_name = "_".join([self.collection_name, "archive", get_timestamp(), get_random_string()])
self.logger.info(
"Renaming collection '%s' to '%s' for archiving purpose." % (self.collection_name, new_name)
)
self.collection.rename(new_name, dropTarget=True)
self.logger.info("Renaming collection '%s' to '%s'", self.temp_collection_name, self.collection_name)
self.db[self.temp_collection_name].rename(self.collection_name)
else:
raise ResourceError("No temp collection (or it's empty)")
[docs]
def post_update_data(self, steps, force, batch_size, job_manager, **kwargs):
"""Override as needed to perform operations after
data has been uploaded"""
pass
[docs]
async def update_data(self, batch_size, job_manager):
"""
Iterate over load_data() to pull data and store it
"""
pinfo = self.get_pinfo()
pinfo["step"] = "update_data"
got_error = False
self.unprepare()
job = await job_manager.defer_to_process(
pinfo,
partial(
upload_worker,
self.fullname,
self.__class__.storage_class,
self.load_data,
self.temp_collection_name,
batch_size,
1, # no batch, just #1
self.data_folder,
),
)
def uploaded(f):
nonlocal got_error
if type(f.result()) != int:
got_error = Exception(f"upload error (should have a int as returned value got {repr(f.result())}")
job.add_done_callback(uploaded)
await job
if got_error:
raise got_error
self.switch_collection()
[docs]
def generate_doc_src_master(self):
_doc = {
"_id": str(self.name),
"name": self.regex_name and self.regex_name or str(self.name),
"timestamp": datetime.datetime.now(),
}
# store mapping
_map = self.__class__.get_mapping()
if _map:
_doc["mapping"] = _map
# type of id being stored in these docs
if hasattr(self.__class__, "__metadata__"):
_doc.update(self.__class__.__metadata__)
# try to find information about the uploader source code
from biothings.hub.dataplugin.assistant import AssistedUploader
if issubclass(self.__class__, AssistedUploader):
# it's a plugin, we'll just point to the plugin folder
src_file = self.__class__.DATA_PLUGIN_FOLDER
else:
src_file = inspect.getfile(self.__class__)
info = get_source_code_info(src_file)
if info:
_doc.setdefault("src_meta", {}).update({"code": info})
return _doc
[docs]
def get_current_and_new_master(self):
new = self.generate_doc_src_master() or {}
dkey = {"_id": new["_id"]}
current = self.src_master.find_one(dkey) or {}
if current.get("src_meta") != new.get("src_meta"):
return {
"kclass": f"{self.__class__.__module__}.{self.__class__.__name__}",
"current": current.get("src_meta"),
"new": new.get("src_meta"),
}
[docs]
def update_master(self):
_doc = self.generate_doc_src_master()
self.save_doc_src_master(_doc)
[docs]
def save_doc_src_master(self, _doc):
dkey = {"_id": _doc["_id"]}
prev = self.src_master.find_one(dkey)
if prev:
self.src_master.update(dkey, {"$set": _doc})
else:
self.src_master.insert_one(_doc)
[docs]
def register_status(self, status, subkey="upload", **extra):
"""
Register step status, ie. status for a sub-resource
"""
upload_info = {"status": status}
upload_info.update(extra)
job_key = "%s.jobs.%s" % (subkey, self.name)
# TODO: should use the same approach as Builder.register_status
# with arguments like 'init' and 'transient'...
if status.endswith("ing"):
# record some "in-progress" information
upload_info["step"] = self.name # this is the actual collection name
upload_info["temp_collection"] = self.temp_collection_name
upload_info["pid"] = os.getpid()
upload_info["logfile"] = self.logfile
upload_info["started_at"] = datetime.datetime.now().astimezone()
# We should use the last_success from the last upload time as a default value for the current's last_success
# If last_success from the last upload doesn't exist or is None, and last upload's status is success,
# the last upload's started_at will be used.
last_upload_info = self.src_doc.get(subkey, {}).get("jobs", {}).setdefault(self.name, {})
last_success = last_upload_info.get("last_success")
last_status = last_upload_info.get("status")
if not last_success and last_status == "success":
last_success = last_upload_info.get("started_at")
if last_success:
upload_info["last_success"] = last_success
self.src_dump.update_one({"_id": self.main_source}, {"$set": {job_key: upload_info}})
else:
# get release that's been uploaded from download part
src_doc = self.src_dump.find_one({"_id": self.main_source}) or {}
# back-compatibility while searching for release
release = src_doc.get("download", {}).get("release") or src_doc.get("release")
data_folder = src_doc.get("download", {}).get("data_folder") or src_doc.get("data_folder")
# only register time when it's a final state
# also, keep previous uploading information
upd = {}
for k, v in upload_info.items():
upd["%s.%s" % (job_key, k)] = v
t1 = round(time.time() - self.t0, 0)
upd["%s.status" % job_key] = status
upd["%s.time" % job_key] = timesofar(self.t0)
upd["%s.time_in_s" % job_key] = t1
upd["%s.step" % job_key] = self.name # collection name
upd["%s.release" % job_key] = release
upd["%s.data_folder" % job_key] = data_folder
# Update last success upload time only when the success
if status == "success":
upd["%s.last_success" % job_key] = (src_doc["upload"]["jobs"].get(self.name) or {}).get("started_at")
self.src_dump.update_one({"_id": self.main_source}, {"$set": upd})
[docs]
async def load(
self,
steps=("data", "post", "master", "clean"),
force=False,
batch_size=10000,
job_manager=None,
**kwargs,
):
"""
Main resource load process, reads data from doc_c using chunk sized as batch_size.
steps defines the different processes used to laod the resource:
- "data" : will store actual data into single collections
- "post" : will perform post data load operations
- "master" : will register the master document in src_master
"""
try:
# check what to do
if isinstance(steps, tuple):
steps = list(
steps
) # may not be necessary, but previous steps default is a list, so let's be consistent
elif isinstance(steps, str):
steps = steps.split(",")
update_data = "data" in steps
update_master = "master" in steps
post_update_data = "post" in steps
clean_archives = "clean" in steps
strargs = "[steps=%s]" % ",".join(steps)
cnt = None
if not self.temp_collection_name:
self.make_temp_collection()
if self.db[self.temp_collection_name]:
self.db[self.temp_collection_name].drop() # drop all existing records just in case.
# sanity check before running
self.check_ready(force)
self.logger.info("Uploading '%s' (collection: %s)" % (self.name, self.collection_name))
self.register_status("uploading")
if update_data:
# unsync to make it pickable
state = self.unprepare()
cnt = await self.update_data(batch_size, job_manager, **kwargs)
self.prepare(state)
if update_master:
self.update_master()
if post_update_data:
got_error = False
self.unprepare()
pinfo = self.get_pinfo()
pinfo["step"] = "post_update_data"
f2 = await job_manager.defer_to_thread(
pinfo,
partial(self.post_update_data, steps, force, batch_size, job_manager, **kwargs),
)
def postupdated(f):
nonlocal got_error
if f.exception():
got_error = f.exception()
f2.add_done_callback(postupdated)
await f2
if got_error:
raise got_error
# take the total from update call or directly from collection
cnt = cnt or self.db[self.collection_name].count()
if clean_archives:
self.clean_archived_collections()
self.register_status("success", count=cnt, err=None, tb=None)
self.logger.info("success %s" % strargs, extra={"notify": True})
except Exception as e:
self.logger.exception("failed %s: %s" % (strargs, e), extra={"notify": True})
import traceback
self.logger.error(traceback.format_exc())
self.register_status("failed", err=str(e), tb=traceback.format_exc())
raise
[docs]
def prepare_src_dump(self):
"""Sync with src_dump collection, collection information (src_doc)
Return src_dump collection"""
src_dump = get_src_dump()
self.src_doc = src_dump.find_one({"_id": self.main_source}) or {}
return src_dump
[docs]
def setup_log(self):
log_folder = os.path.join(config.LOG_FOLDER, "dataload") if config.LOG_FOLDER else None
return get_logger("upload_%s" % self.fullname, log_folder=log_folder)
def __getattr__(self, attr):
"""This catches access to unpicabkle attributes. If unset,
will call sync to restore them."""
# tricky: self._state will always exist when the instance is create
# through __init__(). But... when pickling the instance, __setstate__
# is used to restore attribute on an instance that's hasn't been though
# __init__() constructor. So we raise an error here to tell pickle not
# to restore this attribute (it'll be set after)
if attr == "_state":
raise AttributeError(attr)
if attr in self._state:
if not self._state[attr]:
self.prepare()
return self._state[attr]
else:
raise AttributeError(attr)
[docs]
class NoBatchIgnoreDuplicatedSourceUploader(BaseSourceUploader):
"""Same as default uploader, but will store records and ignore if
any duplicated error occuring (use with caution...). Storage
is done line by line (slow, not using a batch) but preserve order
of data in input file.
"""
storage_class = NoBatchIgnoreDuplicatedStorage
[docs]
class IgnoreDuplicatedSourceUploader(BaseSourceUploader):
"""Same as default uploader, but will store records and ignore if
any duplicated error occuring (use with caution...). Storage
is done using batch and unordered bulk operations.
"""
storage_class = IgnoreDuplicatedStorage
[docs]
class MergerSourceUploader(BaseSourceUploader):
storage_class = MergerStorage
[docs]
class DummySourceUploader(BaseSourceUploader):
"""
Dummy uploader, won't upload any data, assuming data is already there
but make sure every other bit of information is there for the overall process
(usefull when online data isn't available anymore)
"""
[docs]
def prepare_src_dump(self):
src_dump = get_src_dump()
# just populate/initiate an src_dump record (b/c no dump before) if needed
self.src_doc = src_dump.find_one({"_id": self.main_source})
if not self.src_doc:
src_dump.save({"_id": self.main_source})
self.src_doc = src_dump.find_one({"_id": self.main_source})
return src_dump
[docs]
def check_ready(self, force=False):
# bypass checks about src_dump
pass
[docs]
async def update_data(self, batch_size, job_manager=None, release=None):
assert release is not None, "Dummy uploader requires 'release' argument to be specified"
self.logger.info("Dummy uploader, nothing to upload")
# dummy uploaders have no dumper associated b/c it's collection-only resource,
# so fill minimum information so register_status() can set the proper release
self.src_dump.update_one({"_id": self.main_source}, {"$set": {"download.release": release}})
# sanity check, dummy uploader, yes, but make sure data is there
assert self.collection.count() > 0, "No data found in collection '%s' !!!" % self.collection_name
[docs]
class ParallelizedSourceUploader(BaseSourceUploader):
[docs]
def jobs(self):
"""Return list of (`*arguments`) passed to self.load_data, in order. for
each parallelized jobs. Ex: [(x,1),(y,2),(z,3)]
If only one argument is required, it still must be passed as a 1-element tuple
"""
raise NotImplementedError("implement me in subclass")
[docs]
async def update_data(self, batch_size, job_manager=None):
jobs = []
job_params = self.jobs()
got_error = False
# make sure we don't use any of self reference in the following loop
fullname = copy.deepcopy(self.fullname)
storage_class = copy.deepcopy(self.__class__.storage_class)
load_data = copy.deepcopy(self.load_data)
temp_collection_name = copy.deepcopy(self.temp_collection_name)
self.unprepare()
# important: within this loop, "self" should never be used to make sure we don't
# instantiate unpicklable attributes (via via autoset attributes, see prepare())
# because there could a race condition where an error would cause self to log a statement
# (logger is unpicklable) while at the same another job from the loop would be
# subtmitted to job_manager causing a error due to that logger attribute)
# in other words: once unprepared, self should never be changed until all
# jobs are submitted
for bnum, args in enumerate(job_params):
pinfo = self.get_pinfo()
pinfo["step"] = "update_data"
pinfo["description"] = "%s" % str(args)
job = await job_manager.defer_to_process(
pinfo,
partial(
# pickable worker
upload_worker,
# worker name
fullname,
# storage class
storage_class,
# loading func
load_data,
# dest collection name
temp_collection_name,
# batch size
batch_size,
# batch num
bnum,
# and finally *args passed to loading func
*args,
),
)
jobs.append(job)
# raise error as soon as we know
if got_error:
raise got_error
def batch_uploaded(f, name, batch_num):
# important: don't even use "self" ref here to make sure jobs can be submitted
# (see comment above, before loop)
nonlocal got_error
try:
if type(f.result()) != int:
got_error = Exception(
"Batch #%s failed while uploading source '%s' [%s]" % (batch_num, name, f.result())
)
except Exception as e:
got_error = e
job.add_done_callback(partial(batch_uploaded, name=fullname, batch_num=bnum))
if jobs:
await asyncio.gather(*jobs)
if got_error:
raise got_error
self.switch_collection()
self.clean_archived_collections()
[docs]
class NoDataSourceUploader(BaseSourceUploader):
"""
This uploader won't upload any data and won't even assume
there's actual data (different from DummySourceUploader on this point).
It's usefull for instance when mapping need to be stored (get_mapping())
but data doesn't comes from an actual upload (ie. generated)
"""
storage_class = NoStorage
[docs]
async def update_data(self, batch_size, job_manager=None):
self.logger.debug("No data to upload, skip")
[docs]
class UploaderManager(BaseSourceManager):
"""
After registering datasources, manager will orchestrate source uploading.
"""
SOURCE_CLASS = BaseSourceUploader
def __init__(self, poll_schedule=None, *args, **kwargs):
super(UploaderManager, self).__init__(*args, **kwargs)
self.poll_schedule = poll_schedule
[docs]
def get_source_ids(self):
"""Return displayable list of registered source names (not private)"""
# skip private ones starting with __
# skip those deriving from bt.h.autoupdate.uploader.BiothingsUploader, they're used for autohub
# and considered internal (note: while there could be more than 1 uploader per source, when it's
# an autoupdate one, there's only one, so [0])
from biothings.hub.autoupdate.uploader import BiothingsUploader # prevent circular imports
registered = sorted(
[
src
for src, klasses in self.register.items()
if not src.startswith("__") and not issubclass(klasses[0], BiothingsUploader)
]
)
return registered
def __repr__(self):
return "<%s [%d registered]: %s>" % (
self.__class__.__name__,
len(self.register),
self.get_source_ids(),
)
[docs]
def clean_stale_status(self):
src_dump = get_src_dump()
srcs = src_dump.find()
for src in srcs:
jobs = src.get("upload", {}).get("jobs", {})
dirty = False
for subsrc in jobs:
if jobs[subsrc].get("status") == "uploading":
logging.warning("Found stale datasource '%s', marking upload status as 'canceled'", src["_id"])
jobs[subsrc]["status"] = "canceled"
dirty = True
if dirty:
src_dump.replace_one({"_id": src["_id"]}, src)
[docs]
def filter_class(self, klass):
if klass.name is None:
# usually a base defined in an uploader, which then is subclassed in same
# module. Kind of intermediate, not fully functional class
logging.debug("%s has no 'name' defined, skip it" % klass)
return None
else:
return klass
[docs]
def create_instance(self, klass):
inst = klass.create(db_conn_info=self.conn.address)
return inst
[docs]
def register_classes(self, klasses):
for klass in klasses:
config.supersede(klass) # monkey-patch from DB
if klass.main_source:
self.register.setdefault(klass.main_source, []).append(klass)
else:
self.register.setdefault(klass.name, []).append(klass)
[docs]
def upload_all(self, raise_on_error=False, **kwargs):
"""
Trigger upload processes for all registered resources.
`**kwargs` are passed to upload_src() method
"""
jobs = []
for src in self.register:
job = self.upload_src(src, **kwargs)
jobs.extend(job)
return asyncio.gather(*jobs)
[docs]
def upload_src(self, src, *args, **kwargs):
"""
Trigger upload for registered resource named 'src'.
Other args are passed to uploader's load() method
"""
try:
klasses = self[src]
except KeyError:
raise ResourceNotFound(f"Can't find '{src}' in registered sources (whether as main or sub-source)")
jobs = []
try:
for _, klass in enumerate(klasses):
kwargs["job_manager"] = self.job_manager
job = self.job_manager.submit(
# partial(self.create_and_load, klass, job_manager=self.job_manager, *args, **kwargs)
partial(self.create_and_load, klass, *args, **kwargs) # Fix Flake8 B026
)
jobs.append(job)
tasks = asyncio.gather(*jobs)
def done(f):
try:
# just consume the result to raise exception
# if there were an error... (what an api...)
f.result()
logging.info("success", extra={"notify": True})
except Exception as e:
logging.exception("failed: %s" % e, extra={"notify": True})
tasks.add_done_callback(done)
return jobs
except Exception as e:
logging.exception("Error while uploading '%s': %s" % (src, e), extra={"notify": True})
raise
[docs]
async def create_and_update_master(self, klass, dry=False):
compare_data = None
inst = self.create_instance(klass)
inst.prepare()
if dry:
compare_data = inst.get_current_and_new_master()
else:
inst.update_master()
inst.unprepare()
return compare_data
[docs]
async def create_and_load(self, klass, *args, **kwargs):
insts = self.create_instance(klass)
if type(insts) != list:
insts = [insts]
for inst in insts:
await inst.load(*args, **kwargs)
[docs]
def poll(self, state, func):
super(UploaderManager, self).poll(state, func, col=get_src_dump())
[docs]
def source_info(self, source=None):
src_dump = get_src_dump()
src_ids = self.get_source_ids()
if source:
if source in src_ids:
src_ids = [source]
else:
return None
res = []
cur = src_dump.find({"_id": {"$in": src_ids}})
bysrcs = {}
[bysrcs.setdefault(src["_id"], src) for src in cur]
for _id in src_ids:
src = bysrcs.get(_id, {})
uploaders = self.register[_id]
src.setdefault("upload", {})
for uploader in uploaders:
upl = {
"name": "%s.%s" % (inspect.getmodule(uploader).__name__, uploader.__name__),
"bases": [
"%s.%s" % (inspect.getmodule(k).__name__, k.__name__)
for k in uploader.__bases__
if inspect.getmodule(k)
],
"dummy": issubclass(uploader, DummySourceUploader),
}
src["upload"].setdefault("jobs", {}).setdefault(uploader.name, {})
src["upload"]["jobs"][uploader.name]["uploader"] = upl
src["name"] = _id
src["_id"] = _id
res.append(src)
if source:
if res:
return res.pop()
else:
# no information, just return what was passed to honor return type
# + minimal information
return {"name": source, "_id": source}
else:
return res
[docs]
def upload_info(self):
res = {}
for name, klasses in self.register.items():
res[name] = [klass.__name__ for klass in klasses]
return res
[docs]
def set_pending_to_upload(src_name):
src_dump = get_src_dump()
src_dump.update({"_id": src_name}, {"$addToSet": {"pending": "upload"}})