import importlib
import inspect
import json
import os
import pathlib
import pprint
import re
import subprocess
import sys
import textwrap
import urllib.parse
from string import Template
# we switched to use black for code formatting
# from yapf.yapflib import yapf_api
try:
import black
black_avail = True
except ImportError:
black_avail = False
import requests
import yaml
from biothings import config as btconfig
from biothings.hub.dataload.dumper import DockerContainerDumper, LastModifiedFTPDumper, LastModifiedHTTPDumper
from biothings.hub.dataplugin.manager import GitDataPlugin, ManualDataPlugin
from biothings.utils import storage
from biothings.utils.common import (
get_class_from_classpath,
get_plugin_name_from_local_manifest,
get_plugin_name_from_remote_manifest,
parse_folder_name_from_url,
rmdashfr,
)
from biothings.utils.hub_db import get_data_plugin, get_src_dump, get_src_master
from biothings.utils.loggers import get_logger
from biothings.utils.manager import BaseSourceManager
[docs]
class AssistantException(Exception):
pass
[docs]
class LoaderException(Exception):
pass
[docs]
class BasePluginLoader(object):
loader_type = None # set in subclass
def __init__(self, plugin_name):
self.plugin_name = plugin_name
self.plugin_path_name = None # This will be set on loading step
self.setup_log()
self._plugin = None
[docs]
def setup_log(self):
"""Setup and return a logger instance"""
log_folder = os.path.join(btconfig.LOG_FOLDER, "dataload") if btconfig.LOG_FOLDER else None
self.logger, self.logfile = get_logger("loader_%s" % self.plugin_name, log_folder=log_folder)
[docs]
def get_plugin_obj(self):
if self._plugin:
return self._plugin
dp = get_data_plugin()
plugin = dp.find_one({"_id": self.plugin_name})
if not plugin.get("download", {}).get("data_folder"):
raise LoaderException("Can't find data_folder, not available yet ?")
self._plugin = plugin
return plugin
[docs]
def invalidate_plugin(self, error):
self.logger.exception("Invalidate plugin '%s' because: %s" % (self.plugin_name, error))
# flag all plugin associated (there should only one though, but no need to care here)
try:
for klass in self.__class__.data_plugin_manager[self.plugin_name]:
klass.data_plugin_error = error
except KeyError:
# plugin_name is not registered yet
pass
raise LoaderException(error)
[docs]
def can_load_plugin(self):
"""
Return True if loader is able to load plugin (check data folder content)
"""
raise NotImplementedError("implement 'can_load_plugin' in subclass")
[docs]
def load_plugin(self):
"""
Load plugin and register its components
"""
raise NotImplementedError("implement 'load_plugin' in subclass")
[docs]
class ManifestBasedPluginLoader(BasePluginLoader):
loader_type = "manifest"
# should match a _dict_for_***
dumper_registry = {
"http": LastModifiedHTTPDumper,
"https": LastModifiedHTTPDumper,
"ftp": LastModifiedFTPDumper,
"docker": DockerContainerDumper,
}
def _dict_for_base(self, data_url):
if type(data_url) == str:
data_url = [data_url]
return {
"SRC_NAME": self.plugin_name,
"SRC_ROOT_FOLDER": os.path.join(btconfig.DATA_ARCHIVE_ROOT, self.plugin_path_name),
"SRC_FOLDER_NAME": self.plugin_path_name,
"SRC_URLS": data_url,
}
def _dict_for_http(self, data_url):
return self._dict_for_base(data_url)
def _dict_for_https(self, data_url):
d = self._dict_for_http(data_url)
# not secure, but we want to make sure things will work as much as possible...
d["VERIFY_CERT"] = False
return d
def _dict_for_ftp(self, data_url):
return self._dict_for_base(data_url)
def _dict_for_docker(self, data_url):
d = self._dict_for_base(data_url)
return d
[docs]
def can_load_plugin(self):
plugin = self.get_plugin_obj()
df = pathlib.Path(plugin["download"]["data_folder"])
# if "manifest.json" in os.listdir(df) and os.path.exists(os.path.join(df, "manifest.json")):
if pathlib.Path(df, "manifest.json").exists():
return True
# elif "manifest.yaml" in os.listdir(df) and os.path.exists(os.path.join(df, "manifest.yaml")):
elif pathlib.Path(df, "manifest.yaml").exists():
return True
else:
return False
[docs]
def load_plugin(self):
plugin = self.get_plugin_obj()
df = pathlib.Path(plugin["download"]["data_folder"])
# self.plugin_path_name = os.path.basename(df)
self.plugin_path_name = df.name
# if os.path.exists(df):
if df.exists():
# mf = os.path.join(df, "manifest.json")
# mf_yaml = os.path.join(df, "manifest.yaml")
mf = pathlib.Path(df, "manifest.json")
mf_yaml = pathlib.Path(df, "manifest.yaml")
manifest = None
# if os.path.exists(mf):
if mf.exists():
self.logger.debug(f"Loading manifest: {mf}")
manifest = json.load(open(mf))
# elif os.path.exists(mf_yaml):
elif mf_yaml.exists():
self.logger.debug(f"Loading manifest: {mf_yaml}")
manifest = yaml.safe_load(open(mf_yaml))
if manifest:
try:
self.interpret_manifest(manifest, df.as_posix())
except Exception as e:
self.invalidate_plugin("Error loading manifest: %s" % str(e))
else:
self.logger.info("No manifest found for plugin: %s" % plugin["plugin"]["url"])
self.invalidate_plugin("No manifest found")
else:
self.invalidate_plugin("Missing plugin folder '%s'" % df)
[docs]
def get_code_for_mod_name(self, mod_name):
"""
Returns string literal and name of function, given a path
Args:
mod_name: string with module name and function name, separated by colon
Returns:
Tuple[str, str]: containing
- indented string literal for the function specified
- name of the function
"""
try:
mod, funcname = map(str.strip, mod_name.split(":"))
except ValueError as e:
raise AssistantException(
"'Wrong format for '%s', it must be defined following format 'module:func': %s" % (mod_name, e)
)
modpath = self.plugin_path_name + "." + mod
try:
pymod = importlib.import_module(modpath)
# self.logger.info("Imported custom module %s for plugin %s", modpath, self.plugin_path_name)
except (ImportError, TypeError):
# Some data plugins use BioThings generic parser, e.g. CHEBI plugin uses {"parser" : "hub.dataload.data_parsers:load_obo"}
# In such cases, `self.plugin_path_name` is not part of the module path.
pymod = importlib.import_module(mod)
# self.logger.info("Imported generic module %s for plugin %s", mod, self.plugin_path_name)
# reload in case we need to refresh plugin's code
importlib.reload(pymod)
assert funcname in dir(pymod), "%s not found in module %s" % (funcname, pymod)
func = getattr(pymod, funcname)
# fetch source and indent to class method level in the template
strfunc = inspect.getsource(func)
# always indent with spaces, normalize to avoid mixed indenting chars
indentfunc = textwrap.indent(strfunc.replace("\t", " "), prefix=" ")
return indentfunc, funcname
[docs]
def get_dumper_dynamic_class(self, dumper_section, metadata):
if dumper_section.get("data_url"):
if not type(dumper_section["data_url"]) is list:
durls = [dumper_section["data_url"]]
else:
durls = dumper_section["data_url"]
schemes = set([urllib.parse.urlsplit(durl).scheme for durl in durls])
# https = http regarding dumper generation
if len(set([sch.replace("https", "http") for sch in schemes])) > 1:
raise AssistantException(
"Manifest specifies URLs of different types (%s), " % schemes + "expecting only one"
)
scheme = schemes.pop()
if "docker" in scheme:
scheme = "docker"
klass = dumper_section.get("class")
confdict = getattr(self, "_dict_for_%s" % scheme)(durls)
if klass:
dumper_class = get_class_from_classpath(klass)
confdict["BASE_CLASSES"] = klass
else:
dumper_class = self.dumper_registry.get(scheme)
confdict["BASE_CLASSES"] = "biothings.hub.dataload.dumper.%s" % dumper_class.__name__
if not dumper_class:
raise AssistantException("No dumper class registered to handle scheme '%s'" % scheme)
if metadata:
confdict["__metadata__"] = metadata
else:
confdict["__metadata__"] = {}
if dumper_section.get("release"):
indentfunc, func = self.get_code_for_mod_name(dumper_section["release"])
assert func != "set_release", "'set_release' is a reserved method name, pick another name"
confdict[
"SET_RELEASE_FUNC"
] = """
%s
def set_release(self):
self.release = self.%s()
""" % (
indentfunc,
func,
)
else:
confdict["SET_RELEASE_FUNC"] = ""
dklass = None
pnregex = r"^[A-z_][\w\d]+$"
assert re.compile(pnregex).match(
self.plugin_name
), "Incorrect plugin name '%s' (doesn't match regex '%s'" % (self.plugin_name, pnregex)
dumper_name = self.plugin_name.capitalize() + "Dumper"
"%s"
try:
if hasattr(btconfig, "DUMPER_TEMPLATE"):
tpl_file = btconfig.DUMPER_TEMPLATE
else:
# default: assuming in ..../biothings/hub/dataplugin/
curmodpath = os.path.realpath(__file__)
if scheme == "docker":
tpl_file = os.path.join(os.path.dirname(curmodpath), "docker_dumper.py.tpl")
else:
tpl_file = os.path.join(os.path.dirname(curmodpath), "dumper.py.tpl")
tpl = Template(open(tpl_file).read())
confdict["DUMPER_NAME"] = dumper_name
confdict["SRC_NAME"] = self.plugin_name
if dumper_section.get("schedule"):
schedule = """'%s'""" % dumper_section["schedule"]
else:
schedule = "None"
confdict["SCHEDULE"] = schedule
confdict["UNCOMPRESS"] = dumper_section.get("uncompress") or False
pystr = tpl.substitute(confdict)
# print(pystr)
import imp
code = compile(pystr, "<string>", "exec")
mod = imp.new_module(self.plugin_name)
exec(code, mod.__dict__, mod.__dict__)
dklass = getattr(mod, dumper_name)
# we need to inherit from a class here in this file so it can be pickled
assisted_dumper_class = type(
"AssistedDumper_%s" % self.plugin_name,
(
AssistedDumper,
dklass,
),
{},
)
assisted_dumper_class.python_code = pystr
return assisted_dumper_class
except Exception:
self.logger.exception("Can't generate dumper code for '%s'" % self.plugin_name)
raise
else:
raise AssistantException("Invalid manifest, expecting 'data_url' key in 'dumper' section")
[docs]
def get_uploader_dynamic_class(self, uploader_section, metadata, sub_source_name=""):
if uploader_section.get("parser"):
uploader_name = self.plugin_name.capitalize() + sub_source_name + "Uploader"
confdict = {
"SRC_NAME": self.plugin_name,
"SUB_SRC_NAME": sub_source_name,
"UPLOADER_NAME": uploader_name,
}
try:
mod, func = uploader_section.get("parser").split(":")
# make sure the parser module is able to load
# otherwise, the error log should be shown in the UI
self.get_code_for_mod_name(uploader_section["parser"])
confdict["PARSER_MOD"] = mod
confdict["PARSER_FUNC"] = func
if uploader_section.get("parser_kwargs"):
parser_kwargs_serialized = repr(uploader_section["parser_kwargs"])
confdict["PARSER_FACTORY_CODE"] = textwrap.dedent(
f"""
# Setup parser to parser factory
from {mod} import {func} as parser_func
parser_kwargs = {parser_kwargs_serialized}
"""
)
else:
# create empty parser_kwargs to pass to parser_func
parser_kwargs_serialized = repr({})
confdict["PARSER_FACTORY_CODE"] = textwrap.dedent(
f"""
# when code is exported, import becomes relative
try:
from {self.plugin_path_name}.{mod} import {func} as parser_func
except ImportError:
try:
from .{mod} import {func} as parser_func
except ImportError:
# When relative import fails, try to import it directly
import sys
sys.path.insert(0, ".")
from {mod} import {func} as parser_func
parser_kwargs = {parser_kwargs_serialized}
"""
)
except ValueError:
raise AssistantException(
"'parser' must be defined as 'module:parser_func' but got: '%s'" % uploader_section["parser"]
)
try:
ondups = uploader_section.get("on_duplicates")
storage_class = storage.get_storage_class(ondups)
if "ignore_duplicates" in uploader_section:
raise AssistantException(
"'ignore_duplicates' key not supported anymore, use 'on_duplicates' : 'error|ignore|merge'"
)
confdict["STORAGE_CLASS"] = storage_class
# default is not ID conversion at all
confdict["IMPORT_IDCONVERTER_FUNC"] = ""
confdict["IDCONVERTER_FUNC"] = None
confdict["CALL_PARSER_FUNC"] = "parser_func(data_path, **parser_kwargs)"
if uploader_section.get("keylookup"):
assert self.__class__.keylookup, (
"Plugin %s needs _id conversion " % self.plugin_name + "but no keylookup instance was found"
)
self.logger.info("Keylookup conversion required: %s" % uploader_section["keylookup"])
klmod = inspect.getmodule(self.__class__.keylookup)
confdict["IMPORT_IDCONVERTER_FUNC"] = "from %s import %s" % (
klmod.__name__,
self.__class__.keylookup.__name__,
)
convargs = ",".join(["%s=%s" % (k, v) for k, v in uploader_section["keylookup"].items()])
confdict["IDCONVERTER_FUNC"] = "%s(%s)" % (
self.__class__.keylookup.__name__,
convargs,
)
confdict["CALL_PARSER_FUNC"] = "self.__class__.idconverter(parser_func)(data_path, **parser_kwargs)"
if metadata:
confdict["__metadata__"] = metadata
else:
confdict["__metadata__"] = {}
if hasattr(btconfig, "DUMPER_TEMPLATE"):
tpl_file = btconfig.DUMPER_TEMPLATE
elif sub_source_name:
curmodpath = os.path.realpath(__file__)
tpl_file = os.path.join(os.path.dirname(curmodpath), "subuploader.py.tpl")
else:
# default: assuming in ..../biothings/hub/dataplugin/
curmodpath = os.path.realpath(__file__)
tpl_file = os.path.join(os.path.dirname(curmodpath), "uploader.py.tpl")
tpl = Template(open(tpl_file).read())
if uploader_section.get("parallelizer"):
indentfunc, func = self.get_code_for_mod_name(uploader_section["parallelizer"])
assert func != "jobs", "'jobs' is a reserved method name, pick another name"
confdict["BASE_CLASSES"] = "biothings.hub.dataload.uploader.ParallelizedSourceUploader"
confdict["IMPORT_FROM_PARALLELIZER"] = ""
confdict[
"JOBS_FUNC"
] = """
%s
def jobs(self):
return self.%s()
""" % (
indentfunc,
func,
)
else:
confdict["BASE_CLASSES"] = "biothings.hub.dataload.uploader.BaseSourceUploader"
confdict["JOBS_FUNC"] = ""
if uploader_section.get("mapping"):
indentfunc, func = self.get_code_for_mod_name(uploader_section["mapping"])
assert func != "get_mapping", "'get_mapping' is a reserved class method name, pick another name"
confdict[
"MAPPING_FUNC"
] = """
@classmethod
%s
@classmethod
def get_mapping(cls):
return cls.%s()
""" % (
indentfunc,
func,
)
else:
confdict["MAPPING_FUNC"] = ""
pystr = tpl.substitute(confdict)
# print(pystr)
import imp
code = compile(pystr, "<string>", "exec")
mod = imp.new_module(self.plugin_name + sub_source_name)
exec(code, mod.__dict__, mod.__dict__)
uklass = getattr(mod, uploader_name)
# we need to inherit from a class here in this file so it can be pickled
assisted_uploader_class = type(
"AssistedUploader_%s" % self.plugin_name + sub_source_name,
(
AssistedUploader,
uklass,
),
{},
)
assisted_uploader_class.python_code = pystr
return assisted_uploader_class
except Exception as e:
self.logger.exception("Error loading plugin: %s" % e)
raise AssistantException("Can't interpret manifest: %s" % e)
else:
raise AssistantException("Invalid manifest, expecting 'parser' key in 'uploader' section")
[docs]
def get_uploader_dynamic_classes(self, uploader_section, metadata, data_plugin_folder):
uploader_classes = []
for uploader_conf in uploader_section:
sub_source_name = uploader_conf.get("name", "")
uploader_class = self.get_uploader_dynamic_class(uploader_conf, metadata, sub_source_name)
uploader_class.DATA_PLUGIN_FOLDER = data_plugin_folder
# register class in module so it can be pickled easily
sys.modules["biothings.hub.dataplugin.assistant"].__dict__[
"AssistedUploader_%s" % self.plugin_name + sub_source_name
] = uploader_class
uploader_classes.append(uploader_class)
return uploader_classes
[docs]
def interpret_manifest(self, manifest, data_plugin_folder):
# start with requirements before importing anything
if manifest.get("requires"):
reqs = manifest["requires"]
if not type(reqs) == list:
reqs = [reqs]
for req in reqs:
self.logger.info("Install requirement '%s'" % req)
subprocess.check_call([sys.executable, "-m", "pip", "install", req])
if manifest.get("dumper"):
assisted_dumper_class = self.get_dumper_dynamic_class(manifest["dumper"], manifest.get("__metadata__"))
assisted_dumper_class.DATA_PLUGIN_FOLDER = data_plugin_folder
self.__class__.dumper_manager.register_classes([assisted_dumper_class])
# register class in module so it can be pickled easily
sys.modules["biothings.hub.dataplugin.assistant"].__dict__[
"AssistedDumper_%s" % self.plugin_name
] = assisted_dumper_class
if manifest.get("uploader"):
assisted_uploader_class = self.get_uploader_dynamic_class(
manifest["uploader"], manifest.get("__metadata__")
)
assisted_uploader_class.DATA_PLUGIN_FOLDER = data_plugin_folder
self.__class__.uploader_manager.register_classes([assisted_uploader_class])
# register class in module so it can be pickled easily
sys.modules["biothings.hub.dataplugin.assistant"].__dict__[
"AssistedUploader_%s" % self.plugin_name
] = assisted_uploader_class
if manifest.get("uploaders"):
assisted_uploader_classes = self.get_uploader_dynamic_classes(
manifest["uploaders"], manifest.get("__metadata__"), data_plugin_folder
)
self.__class__.uploader_manager.register_classes(assisted_uploader_classes)
if manifest.get("display_name"):
dp = get_data_plugin()
dp.update(
{"_id": self.plugin_name},
{
"$set": {
"plugin.display_name": manifest.get("display_name"),
}
},
)
if manifest.get("biothing_type"):
dp = get_data_plugin()
dp.update(
{"_id": self.plugin_name},
{
"$set": {
"plugin.biothing_type": manifest.get("biothing_type"),
}
},
)
[docs]
class AdvancedPluginLoader(BasePluginLoader):
loader_type = "advanced"
[docs]
def can_load_plugin(self):
plugin = self.get_plugin_obj()
df = plugin["download"]["data_folder"]
if "__init__.py" in os.listdir(df):
return True
else:
return False
[docs]
def load_plugin(self):
plugin = self.get_plugin_obj()
df = plugin["download"]["data_folder"]
if os.path.exists(df):
# we assume there's a __init__ module exposing Dumper and Uploader classes
# as necessary
modpath = df.split("/")[-1]
# before registering, process optional requirements.txt
reqfile = os.path.join(df, "requirements.txt")
if os.path.exists(reqfile):
self.logger.info("Installing requirements from %s for plugin '%s'" % (reqfile, self.plugin_name))
subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", reqfile])
# submit to managers to register datasources
self.logger.info("Registering '%s' to dump/upload managers" % modpath)
# register dumpers if any
try:
self.__class__.dumper_manager.register_source(modpath)
except Exception as e:
self.logger.info("Couldn't register dumper from module '%s': %s" % (modpath, e))
# register uploaders if any
try:
self.__class__.uploader_manager.register_source(modpath)
except Exception as e:
self.logger.info("Couldn't register uploader from module '%s': %s" % (modpath, e))
else:
self.invalidate_plugin("Missing plugin folder '%s'" % df)
[docs]
class BaseAssistant(object):
plugin_type = None # to be defined in subblass
data_plugin_manager = None # set by assistant manager
dumper_manager = None # set by assistant manager
uploader_manager = None # set by assistant manager
keylookup = None # set by assistant manager
# known plugin loaders
loaders = {
"manifest": ManifestBasedPluginLoader,
"advanced": AdvancedPluginLoader,
}
def __init__(self, url):
self.url = url
self._plugin_name = None
self._src_folder = None
self._loader = None
self.logfile = None
self.logger = None
self.setup_log()
[docs]
def setup_log(self):
"""Setup and return a logger instance"""
self.logger, self.logfile = get_logger("assistant_%s" % self.__class__.plugin_type)
[docs]
def register_loader(self):
dp = get_data_plugin()
dp.update(
{"_id": self.plugin_name},
{"$set": {"plugin.loader": self.loader.loader_type}},
upsert=True,
)
@property
def loader(self):
"""
Return loader object able to interpret plugin's folder content
"""
if not self._loader:
# iterate over known loaders, the first one which can interpret plugin content is kept
for klass in self.loaders.values():
# propagate managers
klass.dumper_manager = self.dumper_manager
klass.uploader_manager = self.uploader_manager
klass.data_plugin_manager = self.data_plugin_manager
klass.keylookup = self.keylookup
loader = klass(self.plugin_name)
if loader.can_load_plugin():
self._loader = loader
self.logger.debug(
'For plugin "%s", selecting loader class "%s"',
self.plugin_name,
self._loader.__class__.__name__,
)
self.register_loader()
break
else:
self.logger.debug('Loader %s cannot load plugin "%s"', loader, self.plugin_name)
continue
return self._loader
@property
def plugin_name(self):
"""
Return plugin name, parsed from self.url and set self._src_folder as
path to folder containing dataplugin source code
"""
raise NotImplementedError("implement 'plugin_name' in subclass")
[docs]
def handle(self):
"""Access self.url and do whatever is necessary to bring code to life within the hub...
(hint: that may involve creating a dumper on-the-fly and register that dumper to
a manager...)
"""
raise NotImplementedError("implement 'handle' in subclass")
[docs]
def can_handle(self):
"""Return true if assistant can handle the code"""
raise NotImplementedError("implement 'can_handle' in subclass")
[docs]
def load_plugin(self):
"""
Load plugin and register its components
"""
raise NotImplementedError("implement 'load_plugin' in subclass")
[docs]
class AssistedDumper(object):
DATA_PLUGIN_FOLDER = None
[docs]
class AssistedUploader(object):
DATA_PLUGIN_FOLDER = None
[docs]
class GithubAssistant(BaseAssistant):
plugin_type = "github"
@property
def plugin_name(self):
folder_name = parse_folder_name_from_url(self.url)
if not self._plugin_name:
self._src_folder = os.path.join(btconfig.DATA_PLUGIN_FOLDER, folder_name)
# Try to load plugin name from the local first, if exist that mean we are working with a cloned and updated plugin
# If plugin name is empty that mean this plugin has not cloned to local then we try to fetch its name from the Github
# Otherwise we use the path_name as the fallback.
plugin_name = get_plugin_name_from_local_manifest(os.path.join(btconfig.DATA_PLUGIN_FOLDER, folder_name))
if not plugin_name:
plugin_name = get_plugin_name_from_remote_manifest(self.url)
if not plugin_name:
plugin_name = folder_name
self._plugin_name = plugin_name
return self._plugin_name
[docs]
def can_handle(self):
# analyze headers to guess type of required assitant
try:
headers = requests.head(self.url).headers
if headers.get("server").lower() == "github.com":
return True
except Exception as e:
self.logger.error("%s plugin can't handle URL '%s': %s" % (self.plugin_type, self.url, e))
return False
[docs]
def get_classdef(self):
# generate class dynamically and register
confdict = {
"SRC_NAME": self.plugin_name,
"GIT_REPO_URL": self.url,
"SRC_ROOT_FOLDER": self._src_folder,
}
# TODO: store confdict in hubconf collection
k = type("AssistedGitDataPlugin_%s" % self.plugin_name, (GitDataPlugin,), confdict)
return k
[docs]
def handle(self):
assert self.__class__.data_plugin_manager, "Please set data_plugin_manager attribute"
klass = self.get_classdef()
self.__class__.data_plugin_manager.register_classes([klass])
[docs]
class LocalAssistant(BaseAssistant):
plugin_type = "local"
@property
def plugin_name(self):
if not self._plugin_name:
split = urllib.parse.urlsplit(self.url)
# format local://pluginname so it's in hostname.
# if path is set, it means format is local://subdir/pluginname
# and we don't support that for import reason (we would need to
# add .../plugins/subdir to sys.path, not impossible but might have side effects
# so for now we stay on the safe (and also let's remember 1st version of
# MS DOS didn't support subdirs, so I guess we're on the right path :))
assert not split.path, "It seems URL '%s' references a sub-directory (%s)," % (
self.url,
split.hostname,
) + " with plugin name '%s', sub-directories are not supported (yet)" % split.path.strip("/")
# don't use hostname here because it's lowercased, netloc isn't
# (and we're matching directory names on the filesystem, it's case-sensitive)
src_folder_name = os.path.basename(split.netloc)
try:
self._plugin_name = get_plugin_name_from_local_manifest(
os.path.join(btconfig.DATA_PLUGIN_FOLDER, src_folder_name)
) or src_folder_name
except Exception as ex:
self.logger.exception(ex)
self._plugin_name = src_folder_name
self._src_folder = os.path.join(btconfig.DATA_PLUGIN_FOLDER, src_folder_name)
return self._plugin_name
[docs]
def can_handle(self):
if self.url.startswith(self.__class__.plugin_type + "://"):
return True
else:
return False
[docs]
def get_classdef(self):
# generate class dynamically and register
confdict = {"SRC_NAME": self.plugin_name, "SRC_ROOT_FOLDER": self._src_folder}
k = type("AssistedManualDataPlugin_%s" % self.plugin_name, (ManualDataPlugin,), confdict)
return k
[docs]
def handle(self):
assert self.__class__.data_plugin_manager, "Please set data_plugin_manager attribute"
klass = self.get_classdef()
self.__class__.data_plugin_manager.register_classes([klass])
[docs]
class AssistantManager(BaseSourceManager):
def __init__(
self,
data_plugin_manager,
dumper_manager,
uploader_manager,
keylookup=None,
default_export_folder="hub/dataload/sources",
*args,
**kwargs,
):
super(AssistantManager, self).__init__(*args, **kwargs)
self.data_plugin_manager = data_plugin_manager
self.dumper_manager = dumper_manager
self.uploader_manager = uploader_manager
self.keylookup = keylookup
if not os.path.exists(btconfig.DATA_PLUGIN_FOLDER):
os.makedirs(btconfig.DATA_PLUGIN_FOLDER)
self.default_export_folder = default_export_folder
# register data plugin folder in python path so we can import
# plugins (sub-folders) as packages
sys.path.insert(0, btconfig.DATA_PLUGIN_FOLDER)
self.logfile = None
self.setup_log()
[docs]
def setup_log(self):
"""Setup and return a logger instance"""
self.logger, self.logfile = get_logger("assistantmanager")
[docs]
def create_instance(self, klass, url):
return klass(url)
[docs]
def register_classes(self, klasses):
for klass in klasses:
klass.data_plugin_manager = self.data_plugin_manager
klass.dumper_manager = self.dumper_manager
klass.uploader_manager = self.uploader_manager
klass.keylookup = self.keylookup
self.register[klass.plugin_type] = klass
[docs]
def submit(self, url):
# submit url to all registered assistants (in order)
# and return the first claiming it can handle that URLs
for typ in self.register:
aklass = self.register[typ]
inst = self.create_instance(aklass, url)
if inst.can_handle():
return inst
return None
[docs]
def unregister_url(self, url=None, name=None):
dp = get_data_plugin()
if url:
url = url.strip()
doc = dp.find_one({"plugin.url": url})
elif name:
doc = dp.find_one({"_id": name})
url = doc["plugin"]["url"]
else:
raise ValueError("Specify 'url' or 'name'")
if not doc:
raise AssistantException("Plugin is not registered (url=%s, name=%s)" % (url, name))
# should be only one but just in case
dp.remove({"_id": doc["_id"]})
# delete plugin code so it won't be auto-register
# by 'local' plugin assistant (issue studio #7)
if doc.get("download", {}).get("data_folder"):
codefolder = doc["download"]["data_folder"]
self.logger.info("Delete plugin source code in '%s'" % codefolder)
rmdashfr(codefolder)
assistant = self.submit(url)
try:
self.data_plugin_manager.register.pop(assistant.plugin_name)
except KeyError:
raise AssistantException("Plugin '%s' is not registered" % url)
self.dumper_manager.register.pop(assistant.plugin_name, None)
self.uploader_manager.register.pop(assistant.plugin_name, None)
[docs]
def register_url(self, url):
url = url.strip()
dp = get_data_plugin()
folder_name = parse_folder_name_from_url(url)
if dp.find_one({"plugin.url": url}) or dp.find_one(
{"download.data_folder": f"{btconfig.DATA_PLUGIN_FOLDER}/{folder_name}"}
):
self.logger.info("Plugin '%s' already registered" % url)
return
assistant = self.submit(url)
self.logger.info("For data-plugin URL '%s', selected assistant is: %s" % (url, assistant))
if assistant:
# register plugin info
# if a github url was used, by default, we assume it's a manifest-based plugin
# (we can't know until we have a look at the content). So assistant will have
# manifest-based loader. If it fails, another assistant with advanced loader will
# be used to try again.
dp.update(
{"_id": assistant.plugin_name},
{"$set": {"plugin": {"url": url, "type": assistant.plugin_type, "active": True}}},
upsert=True,
)
assistant.handle()
job = self.data_plugin_manager.load(assistant.plugin_name)
assert len(job) == 1, "Expecting one job, got: %s" % job
job = job.pop()
def loaded(f):
try:
_ = f.result()
self.logger.debug("Plugin '%s' downloaded, now loading manifest" % assistant.plugin_name)
assistant.loader.load_plugin()
except Exception as e:
self.logger.exception("Unable to download plugin '%s': %s" % (assistant.plugin_name, e))
job.add_done_callback(loaded)
return job
else:
raise AssistantException("Could not find any assistant able to handle URL '%s'" % url)
[docs]
def load_plugin(self, plugin):
ptype = plugin["plugin"]["type"]
url = plugin["plugin"]["url"]
if not plugin["plugin"]["active"]:
self.logger.info("Data plugin '%s' is deactivated, skip" % url)
return
self.logger.info("Loading data plugin '%s' (type: %s)" % (plugin["_id"], ptype))
if ptype in self.register:
try:
aklass = self.register[ptype]
assistant = self.create_instance(aklass, url)
assistant.handle()
assistant.loader.load_plugin()
except Exception as e:
self.logger.exception("Unable to load plugin '%s': %s" % (url, e))
else:
raise AssistantException("Unknown data plugin type '%s'" % ptype)
[docs]
def load(self, autodiscover=True):
"""
Load plugins registered in internal Hub database and generate/register
dumpers & uploaders accordingly.
If autodiscover is True, also search DATA_PLUGIN_FOLDER for existing
plugin directories not registered yet in the database, and register
them automatically.
"""
plugin_dirs = []
if autodiscover:
try:
plugin_dirs = os.listdir(btconfig.DATA_PLUGIN_FOLDER)
except FileNotFoundError as e:
raise AssistantException("Invalid DATA_PLUGIN_FOLDER: %s" % e)
dp = get_data_plugin()
cur = dp.find()
for plugin in cur:
try:
plugin_dir_name = os.path.basename(plugin["download"]["data_folder"])
except Exception as e:
self.logger.warning("Couldn't load plugin '%s': %s" % (plugin["_id"], e))
continue
plugin_name = get_plugin_name_from_local_manifest(plugin.get("download").get("data_folder"))
if plugin_name and plugin["_id"] != plugin_name:
plugin = self.update_plugin_name(plugin, plugin_name)
# remove plugins from folder list if already register
if plugin_dir_name in plugin_dirs:
plugin_dirs.remove(plugin_dir_name)
try:
self.load_plugin(plugin)
except Exception as e:
self.logger.warning("Couldn't load plugin '%s': %s" % (plugin["_id"], e))
continue
# some still unregistered ? (note: list always empty if autodiscover=False)
if plugin_dirs:
for pdir in plugin_dirs:
os.path.join(btconfig.DATA_PLUGIN_FOLDER, pdir)
try:
self.logger.info("Found unregistered manifest-based plugin '%s', auto-register it" % pdir)
self.register_url(f"local://{pdir}")
except Exception as e:
self.logger.exception("Couldn't auto-register plugin '%s': %s" % (pdir, e))
continue
[docs]
def update_plugin_name(self, plugin, new_name):
dp = get_data_plugin()
old_name = plugin.pop("_id")
dp.update({"_id": new_name}, {"$set": plugin}, upsert=True)
dp.remove({"_id": old_name})
plugin["_id"] = new_name
src_dump_db = get_src_dump()
src_dump_doc = src_dump_db.find_one({"_id": old_name})
if src_dump_doc:
src_dump_doc.pop("_id")
src_dump_db.update({"_id": new_name}, {"$set": src_dump_doc}, upsert=True)
src_dump_db.remove({"_id": old_name})
src_master_db = get_src_master()
src_master_doc = src_master_db.find_one({"_id": old_name})
if src_master_doc:
src_master_doc.pop("_id")
src_master_doc["name"] = new_name
src_master_db.update({"_id": new_name}, {"$set": src_master_doc}, upsert=True)
src_master_db.remove({"_id": old_name})
return plugin
[docs]
def export_dumper(self, plugin_name, folder):
res = {"dumper": {"status": None, "file": None, "class": None, "message": None}}
try:
dclass = self.dumper_manager[plugin_name]
except KeyError:
res["dumper"]["status"] = "warning"
res["dumper"]["message"] = "No dumper found for plugin '%s'" % plugin_name
try:
dumper_name = plugin_name.capitalize() + "Dumper"
self.logger.debug("Exporting dumper %s" % dumper_name)
assert len(dclass) == 1, "More than one dumper found: %s" % dclass
dclass = dclass[0]
assert hasattr(dclass, "python_code"), "No generated code found"
dinit = os.path.join(folder, "__init__.py")
dfile = os.path.join(folder, "dump.py")
# clear init, we'll append code
# we use yapf (from Google) as autopep8 (for instance) doesn't give
# good results in term in indentation (input_type list for keylookup for instance)
# switched to use black from yapf
# beauty, _ = yapf_api.FormatCode(dclass.python_code)
if black_avail:
beauty = black.format_str(dclass.python_code, mode=black.Mode())
else:
raise ImportError('"black" package is required for exporting formatted code.')
with open(dfile, "w") as fout:
fout.write(beauty)
with open(dinit, "a") as fout:
fout.write("from .dump import %s\n" % dumper_name)
res["dumper"]["status"] = "ok"
res["dumper"]["file"] = dfile
res["dumper"]["class"] = dumper_name
except Exception as e:
res["dumper"]["status"] = "error"
res["dumper"]["message"] = "Error exporting dumper: %s" % e
return res
return res
[docs]
def export_uploader(self, plugin_name, folder):
res = {"uploader": {"status": None, "file": [], "class": [], "message": None}}
try:
uclasses = self.uploader_manager[plugin_name]
except KeyError:
res["uploader"]["status"] = "warning"
res["uploader"]["message"] = "No uploader found for plugin '%s'" % plugin_name
return res
status = "ok"
message = ""
for uclass in uclasses:
try:
uploader_name = uclass.__name__.split("_")[1].capitalize() + "Uploader"
self.logger.debug("Exporting uploader %s" % uploader_name)
# assert len(uclass) == 1, "More than one uploader found: %s" % uclass
assert hasattr(uclass, "python_code"), "No generated code found"
dinit = os.path.join(folder, "__init__.py")
mod_name = f"{uclass.__name__.split('_')[1]}_upload"
ufile = os.path.join(folder, mod_name + ".py")
# switched to use black from yapf
# beauty, _ = yapf_api.FormatCode(uclass.python_code)
if black_avail:
beauty = black.format_str(uclass.python_code, mode=black.Mode())
else:
raise ImportError('"black" package is required for exporting formatted code.')
with open(ufile, "w") as fout:
fout.write(beauty)
with open(dinit, "a") as fout:
fout.write(f"from .{mod_name} import %s\n" % uploader_name)
res["uploader"]["file"].append(ufile)
res["uploader"]["class"].append(uploader_name)
except Exception as e:
status = "error"
message = "Error exporting uploader: %s" % e
res["uploader"]["status"] = status
res["uploader"]["message"] = message
return res
[docs]
def export_mapping(self, plugin_name, folder):
res = {"mapping": {"status": None, "file": None, "message": None, "origin": None}}
# first check if plugin defines a custom mapping in manifest
# if that's the case, we don't need to export mapping there
# as it'll be exported with "uploader" code
plugindoc = get_data_plugin().find_one({"_id": plugin_name})
assert plugindoc, "Can't find plugin named '%s'" % plugin_name
plugin_folder = plugindoc.get("download", {}).get("data_folder")
assert plugin_folder, "Can't find plugin folder for '%s'" % plugin_name
try:
manifest = json.load(open(os.path.join(plugin_folder, "manifest.json")))
if "mapping" in manifest.get("uploader", {}):
res["mapping"]["message"] = "Custom mapping included in uploader export"
res["mapping"]["status"] = "warning"
res["mapping"]["origin"] = "custom"
return res
except Exception as e:
self.logger.error("Can't read manifest while exporting code: %s" % e)
# try to export mapping from src_master (official)
doc = get_src_master().find_one({"_id": plugin_name})
if doc:
mapping = doc.get("mapping")
res["mapping"]["origin"] = "registered"
else:
doc = get_src_dump().find_one({"_id": plugin_name})
mapping = doc and doc.get("inspect", {}).get("jobs", {}).get(plugin_name, {}).get("inspect", {}).get(
"results", {}
).get("mapping")
res["mapping"]["origin"] = "inspection"
if not mapping:
res["mapping"]["origin"] = None
res["mapping"]["status"] = "warning"
res["mapping"]["message"] = "Can't find registered or generated (inspection) mapping"
return res
else:
ufile = os.path.join(folder, "upload.py")
# switched to use black from yapf
# strmap, _ = yapf_api.FormatCode(pprint.pformat(mapping))
strmap = black.format_str(pprint.pformat(mapping), mode=black.Mode())
with open(ufile, "a") as fout:
fout.write(
"""
@classmethod
def get_mapping(klass):
return %s\n"""
% textwrap.indent((strmap), prefix=" " * 2)
)
res["mapping"]["file"] = ufile
res["mapping"]["status"] = "ok"
return res
[docs]
def export(
self,
plugin_name,
folder=None,
what=["dumper", "uploader", "mapping"], # noqa: B006
purge=False,
):
"""
Export generated code for a given plugin name, in given folder
(or use DEFAULT_EXPORT_FOLDER if None). Exported information can be:
- dumper: dumper class generated from the manifest
- uploader: uploader class generated from the manifest
- mapping: mapping generated from inspection or from the manifest
If "purge" is true, any existing folder/code will be deleted first, otherwise,
will raise an error if some folder/files already exist.
"""
res = {}
# sanity checks
if type(what) == str:
what = [what]
folder = folder or self.default_export_folder
assert os.path.exists(folder), "Folder used to export code doesn't exist: %s" % os.path.abspath(folder)
assert plugin_name # avoid deleting the whole export folder when purge=True...
dp = get_data_plugin()
plugin = dp.find_one({"_id": plugin_name})
plugin_path_name = os.path.basename(plugin["download"]["data_folder"])
if not plugin:
raise Exception(f"Data plugin {plugin_name} does not exist!")
folder = os.path.join(folder, plugin_path_name)
if purge:
rmdashfr(folder)
if not os.path.exists(folder):
os.makedirs(folder)
elif not purge:
raise FileExistsError("Folder '%s' already exists, use purge=True" % folder)
dinit = os.path.join(folder, "__init__.py")
with open(dinit, "w") as fout:
fout.write("")
if "dumper" in what:
res.update(self.export_dumper(plugin_name, folder))
if "uploader" in what:
res.update(self.export_uploader(plugin_name, folder))
if "mapping" in what:
assert "uploader" in what, "'uploader' needs to be exported too to export mapping"
res.update(self.export_mapping(plugin_name, folder))
# there's also at least a parser module, maybe a release module, and some more
# dependencies, indirect, not listed in the manifest. We'll just copy everything from
# the plugin folder to the export folder
plugin_folder = os.path.join(btconfig.DATA_PLUGIN_FOLDER, plugin_path_name)
for f in os.listdir(plugin_folder):
src = os.path.join(plugin_folder, f)
dst = os.path.join(folder, f)
# useless or strictly plugin-machinery-specific, skip
if f in ["__pycache__", "manifest.json", "__init__.py"] or f.startswith("."):
self.logger.debug("Skipping '%s', not necessary" % src)
continue
self.logger.debug("Copying %s to %s" % (src, dst))
try:
with open(src) as fin:
with open(dst, "w") as fout:
fout.write(fin.read())
except IsADirectoryError:
self.logger.error("%s is a directory, expecting only files to copy" % src)
continue
return res