# from http://asyncssh.readthedocs.io/en/latest/#id13
# To run this program, the file ``ssh_host_key`` must exist with an SSH
# private key in it to use as a server host key.
import asyncio
import copy
import datetime
import io
import json
import os
import pydoc
import re
import sys
import time
from collections import OrderedDict, UserDict
from functools import partial
from pprint import pformat
try:
    from IPython import InteractiveShell
except ImportError:
    # Suppress import error when we just run CLI
    InteractiveShell = object
from biothings import config
from biothings.utils.common import timesofar
from biothings.utils.dataload import to_boolean
from biothings.utils.docs import flatten_doc
from biothings.utils.hub_db import backup, get_cmd, get_last_command, restore
from biothings.utils.loggers import ShellLogger
from biothings.utils.redirect_streams import RedirectStdStreams
if hasattr(config, "logger"):
    logging = config.logger
else:
    import logging
# useful variables to bring into hub namespace
pending = "pending"
HUB_ENV = hasattr(config, "HUB_ENV") and config.HUB_ENV or ""  # default: prod (or "normal")
VERSIONS = HUB_ENV and "%s-versions" % HUB_ENV or "versions"
LATEST = HUB_ENV and "%s-latest" % HUB_ENV or "latest"
[docs]
def jsonreadify(cmd):
    newcmd = copy.copy(cmd)
    newcmd.pop("jobs")
    # try to make the data structure of the returned
    # results for each command is hubdb compatible.
    if "results" in newcmd:
        results = []
        for result in newcmd.pop("results"):
            if isinstance(result, UserDict):
                results.append(result.data)
            #
            # elif isinstance(result, ...):
            #     pass # add more here
            #
            else:  # already compliant
                results.append(result)
        newcmd["results"] = results
    return newcmd 
##############
# HUB SERVER #
##############
[docs]
class AlreadyRunningException(Exception):
    pass 
[docs]
class CommandError(Exception):
    pass 
[docs]
class NoSuchCommand(Exception):
    pass 
[docs]
class CommandNotAllowed(Exception):
    pass 
[docs]
class CommandDefinition(dict):
    pass 
[docs]
class HubShell(InteractiveShell):
    launched_commands = {}
    pending_outputs = {}
    cmd_cnt = None
    cmd = None  # "cmd" collection
    def __init__(self, job_manager):
        self.job_manager = job_manager
        self.commands = OrderedDict()
        self.managers = {}
        self.extra_ns = OrderedDict()
        self.tracked = {}  # command calls kept in history or not
        self.hidden = {}  # not returned by help()
        self.origout = sys.stdout
        self.buf = io.StringIO()
        self.shellog = ShellLogger(name="shell")
        # there should be only one shell instance (todo: singleton)
        self.__class__.cmd = get_cmd()
        self.__class__.set_command_counter()
        self.last_std_contents = None
        super(HubShell, self).__init__(user_ns=self.extra_ns)
[docs]
    @classmethod
    def set_command_counter(cls):
        assert cls.cmd, "No cmd collection set"
        try:
            res = get_last_command()
            if res:
                logging.debug("Last launched command ID: %s", res["_id"])
                cls.cmd_cnt = int(res["_id"]) + 1
            else:
                logging.info("No previously stored command found, set counter to 1")
                cls.cmd_cnt = 1
        except StopIteration:
            logging.info("Can't find highest command number, assuming starting from scratch")
            cls.cmd_cnt = 1 
[docs]
    def set_commands(self, basic_commands, *extra_ns):
        def register(commands, hidden=False):
            for name, cmd in commands.items():
                if name in self.commands:
                    raise CommandError("Command defined multiple time: %s" % name)
                # if type(cmd) == CommandDefinition:      # TODO: remove this line
                if isinstance(cmd, CommandDefinition):
                    try:
                        self.commands[name] = cmd["command"]
                        self.tracked[name] = cmd.get("tracked", True)
                        self.hidden[name] = cmd.get("hidden", False) or hidden
                    except KeyError as e:
                        raise CommandError(
                            "Could not register command because missing '%s' in definition: %s" % (e, cmd)
                        )
                else:
                    self.commands[name] = cmd
                    self.hidden[name] = hidden
                # update original passed commands to caller knows what's been done there
                if name not in basic_commands:
                    basic_commands[name] = cmd
        # update with ssh server default commands
        register(basic_commands)
        # don't track this calls
        register({"restart": CommandDefinition(command=self.restart, track=True)})
        register({"stop": CommandDefinition(command=self.stop, track=True)})
        register({"backup": CommandDefinition(command=backup, track=True)})
        register({"restore": CommandDefinition(command=restore, track=True)})
        register({"help": CommandDefinition(command=self.help, track=False)})
        register({"commands": CommandDefinition(command=self.command_info, tracked=False)})
        register(
            {
                "command": CommandDefinition(
                    command=lambda id, *args, **kwargs: self.command_info(id=id, *args, **kwargs),
                    tracked=False,
                )
            }
        )
        for extra in extra_ns:
            # don't expose extra commands, they're kind of private/advanced
            register(extra, hidden=True)
        # self.extra_ns["cancel"] = self.__class__.cancel
        # for boolean calls
        self.extra_ns["_and"] = _and
        self.extra_ns["partial"] = partial
        self.extra_ns["hub"] = self
        # merge official/public commands with hidden/private to
        # make the whole available in shell's namespace
        self.extra_ns.update(self.commands) 
        # Note: there's no need to update shell namespace as self.extra_ns
        # has been passed by ref in __init__() so things get updated automagically
        # (self.user_ns.update(...) can be used otherwise, self.user_ns is IPython
        # internal namespace dict
[docs]
    def stop(self, force=False):
        return self.restart(force=force, stop=True) 
[docs]
    def restart(self, force=False, stop=False):
        async def do():
            try:
                if stop:
                    event = "hub_stop"
                    msg = "Hub is stopping"
                else:
                    event = "hub_restart"
                    msg = "Hub is restarting"
                logging.critical(
                    json.dumps({"type": "alert", "msg": msg, "event": event}),
                    extra={"event": True},
                )
                logging.info("Stopping job manager...")
                j = self.job_manager.stop(force=force)
                def ok(f):
                    f.result()  # consume
                    logging.error("Job manager stopped")
                j.add_done_callback(ok)
                await j
            except Exception as e:
                logging.error("Error while recycling the process queue: %s", e)
                raise
        def start(f):
            f.result()  # consume future's result to potentially raise exception
            logging.debug("%s %s", [sys.executable], sys.argv)
            import subprocess
            subprocess.Popen([sys.executable] + sys.argv)
            self.job_manager.hub_process.kill()
            sys.exit(0)
        def autokill(f):
            f.result()
            self.job_manager.hub_process.kill()
        fut = asyncio.ensure_future(do())
        if stop:
            logging.warning("Stopping hub")
            fut.add_done_callback(autokill)
        else:
            logging.warning("Restarting hub")
            fut.add_done_callback(start)
        return fut 
[docs]
    def help(self, func=None):
        """
        Display help on given function/object or list all available commands
        """
        if not func:
            cmds = "\nAvailable commands:\n\n"
            for k in self.commands:
                if self.hidden[k]:
                    continue
                cmds += "\t%s\n" % k
            cmds += "\nType: 'help(command)' for more\n"
            return cmds
        elif isinstance(func, partial):
            docstr = "\n" + pydoc.render_doc(func.func, title="Hub documentation: %s")
            docstr += "\nDefined et as a partial, with:\nargs:%s\nkwargs:%s\n" % (
                repr(func.args),
                repr(func.keywords),
            )
            return docstr
        elif isinstance(func, CompositeCommand):
            docstr = "\nComposite command:\n\n%s\n" % func
            return docstr
        else:
            try:
                return "\n" + pydoc.render_doc(func, title="Hub documentation: %s")
            except ImportError:
                return "\nHelp not available for this command\n" 
[docs]
    def launch(self, pfunc):
        """
        Helper to run a command and register it
        pfunc is partial taking no argument. Command name
        is generated from partial's func and arguments
        """
        res = pfunc()
        # rebuild a command as string
        strcmd = pfunc.func.__name__
        strcmd += "("
        strargs = []
        if pfunc.args:
            strargs.append(",".join([repr(a) for a in pfunc.args]))
        if pfunc.keywords:
            strargs.append(",".join(["%s=%s" % (k, repr(v)) for (k, v) in pfunc.keywords.items()]))
        strcmd += ",".join(strargs)
        strcmd += ")"
        # we use force here because, very likely, the command from strcmd we generated
        # isn't part of shell's known commands (and there's a check about that when force=False)
        self.register_command(strcmd, res, force=True)
        return pfunc 
[docs]
    def extract_command_name(self, cmd):
        try:
            # extract before () (non-callable are never tracked)
            grps = re.fullmatch(r"([\w\.]+)(\(.*\))", cmd.strip()).groups()
            return grps[0]
        except AttributeError:
            raise CommandError("Can't extract command name from '%s'" % repr(cmd)) 
[docs]
    @classmethod
    def save_cmd(cls, _id, cmd):
        newcmd = jsonreadify(cmd)
        cls.cmd.replace_one({"_id": _id}, newcmd, upsert=True) 
[docs]
    def register_managers(self, managers):
        self.managers = managers 
[docs]
    def register_command(self, cmd, result, force=False):
        """
        Register a command 'cmd' inside the shell (so we can keep track of it).
        'result' is the original value that was returned when cmd was submitted.
        Depending on the type, returns a cmd number (ie. result was an asyncio task
        and we need to wait before getting the result) or directly the result of
        'cmd' execution, returning, in that case, the output.
        """
        # see if command should actually be registered
        try:
            cmdname = self.extract_command_name(cmd)
        except CommandError:
            # if can't extract command name, then don't even try to register
            # (could be, for instance, "pure" python code typed from the console)
            logging.debug("Can't extract command from %s, can't register", cmd)
            return result
        # also, never register non-callable command
        if not force and (not callable(self.extra_ns.get(cmdname)) or self.tracked.get(cmdname, True) is False):
            return result
        cmdnum = self.__class__.cmd_cnt
        cmdinfo = CommandInformation(cmd=cmd, jobs=result, started_at=time.time(), id=cmdnum, is_done=False)
        assert cmdnum not in self.__class__.launched_commands
        # register
        self.__class__.launched_commands[cmdnum] = cmdinfo
        self.__class__.save_cmd(cmdnum, cmdinfo)
        self.__class__.cmd_cnt += 1
        # TODO: clean up the following if statement, confirm with Sebastien
        if (
            type(result) == asyncio.tasks.Task
            or type(result) == asyncio.tasks._GatheringFuture
            or type(result) == asyncio.Future
            or type(result) == list
            and len(result) > 0
            and type(result[0]) == asyncio.tasks.Task
        ):
            # it's asyncio related
            result = type(result) != list and [result] or result  # TODO: cleanup and confirm this line
            cmdinfo["jobs"] = result
            return cmdinfo
        else:
            # ... and it's not asyncio related, we can display it directly
            cmdinfo["is_done"] = True
            cmdinfo["failed"] = False
            cmdinfo["started_at"] = time.time()
            cmdinfo["finished_at"] = time.time()
            cmdinfo["duration"] = "0s"
            return result 
[docs]
    def eval(self, line, return_cmdinfo=False, secure=False):
        line = line.strip()
        self.shellog.input(line)
        origline = line  # keep what's been originally entered
        # poor man's singleton...
        if line in [j["cmd"] for j in self.__class__.launched_commands.values() if not j.get("is_done")]:
            raise AlreadyRunningException("Command '%s' is already running\n" % repr(line))
        # is it a hub command, in which case, intercept and run the actual declared cmd
        # IMPORTANT !!! this is where we allow the command or not when secure=True IMPORTANT !!!
        # the logic is following:
        # - what's before parenthesis must exactly match a command
        # - parenthesis are mandatory
        # - no '&&' operator allowed
        if secure:
            # command must be alpha only, argument with "," and "=", or no arg at all
            pat = r'^([A-Za-z_]+)\(["\'\w\s=,.-]*\)$'
        else:
            pat = r"(.*)\(.*\)"  # more permissive
        m = re.match(pat, line)
        if m:
            cmd = m.groups()[0].strip()
            if secure and cmd not in self.commands:
                #  match regex rule but not a valid/existing command, discard it
                raise NoSuchCommand(cmd)
            if cmd in self.commands and isinstance(self.commands[cmd], CompositeCommand):
                line = self.commands[cmd]
        elif line != "" and secure:
            # we have something entered, it doesn't match our regex rule, discard it
            raise CommandNotAllowed(line)
        # cmdline is the actual command sent to shell, line is the one displayed
        # they can be different if there's a preprocessing
        cmdline = line
        # && cmds ? ie. chained cmds
        if "&&" in line:
            chained_cmds = [cmd for cmd in map(str.strip, line.split("&&")) if cmd]
            if len(chained_cmds) > 1:
                # need to build a command with _and and using partial, meaning passing original func param
                # to the partials
                strcmds = []
                for one_cmd in chained_cmds:
                    func, args = re.match(r"(.*)\((.*)\)", one_cmd).groups()
                    if args:
                        strcmds.append("partial(%s,%s)" % (func, args))
                    else:
                        strcmds.append("partial(%s)" % func)
                cmdline = "_and(%s)" % ",".join(strcmds)
            else:
                raise CommandError("Using '&&' operator required two operands\n")
        # r = self.run_cell(cmdline, store_history=True)
        outputs = []
        with RedirectStdStreams() as redirect_stream:
            r = self.run_cell(cmdline, store_history=True)
            self.last_std_contents = redirect_stream.get_std_contents()
        if not r.success:
            raise CommandError("%s\n" % repr(r.error_in_exec))
        else:
            # command was a success, now get the results:
            if r.result is None:
                # -> nothing special was returned, grab the stdout
                self.buf.seek(0)
                # from print stdout ?
                b = self.buf.read()
                outputs.append(b)
                # clear buffer
                self.buf.seek(0)
                self.buf.truncate()
            else:
                # -> we have something returned...
                res = self.register_command(cmd=origline, result=r.result)
                # if type(res) != CommandInformation:    # TODO: remove this line
                if not isinstance(res, CommandInformation):
                    # if type(res) != str:    # TODO: remove this line
                    if not isinstance(res, str):
                        outputs.append(pformat(res))
                    else:
                        outputs.append(res)
                else:
                    if return_cmdinfo:
                        return res
        # Note: this will cause all outputs to go to one SSH session, ie. if multiple users
        # are logged, only one will the results
        if self.__class__.pending_outputs:
            outputs.extend(self.__class__.pending_outputs.values())
            self.__class__.pending_outputs = {}
        return outputs 
    # @classmethod
    # def cancel(klass,jobnum):
    #    return klass.launched_commands.get(jobnum)
[docs]
    @classmethod
    def refresh_commands(cls):
        for num, info in sorted(cls.launched_commands.items()):
            # already process, this current command is now history
            # Note: if we have millions of commands there, it could last quite a while,
            # but IRL we only have a few
            if info.get("is_done") is True:
                continue
            # is_done = set([j.done() for j in info["jobs"]]) == set([True])   # TODO: remove this line
            is_done = {j.done() for j in info["jobs"]} == {True}
            has_err = is_done and [True for j in info["jobs"] if j.exception()] or None
            localoutputs = (
                is_done
                and ([str(j.exception()) for j in info["jobs"] if j.exception()] or [j.result() for j in info["jobs"]])
                or None
            )
            if is_done:
                cls.launched_commands[num]["is_done"] = True
                cls.launched_commands[num]["failed"] = has_err and has_err[0] or False
                cls.launched_commands[num]["results"] = localoutputs
                cls.launched_commands[num]["finished_at"] = time.time()
                cls.launched_commands[num]["duration"] = timesofar(
                    t0=cls.launched_commands[num]["started_at"],
                    t1=cls.launched_commands[num]["finished_at"],
                )
                cls.save_cmd(num, cls.launched_commands[num])
                if not has_err and localoutputs and set(map(type, localoutputs)) == {str}:
                    localoutputs = "\n" + "".join(localoutputs)
                cls.pending_outputs[num] = "[%s] %s {%s} %s: finished %s " % (
                    num,
                    has_err and "ERR" or "OK",
                    timesofar(info["started_at"]),
                    info["cmd"],
                    localoutputs,
                )
            else:
                cls.pending_outputs[num] = "[%s] RUN {%s} %s" % (
                    num,
                    timesofar(info["started_at"]),
                    info["cmd"],
                ) 
[docs]
    @classmethod
    def command_info(cls, id=None, running=None, failed=None):
        cmds = {}
        if id is not None:
            try:
                id = int(id)
                return jsonreadify(cls.launched_commands[id])
            except KeyError:
                raise CommandError("No such command with ID %s" % repr(id))
            except ValueError:
                raise CommandError("Invalid ID %s" % repr(id))
        if running is not None:
            is_done = not to_boolean(running)
        else:
            is_done = None
        if failed is not None:
            failed = to_boolean(failed)
        for _id, cmd in cls.launched_commands.items():
            if is_done is not None:
                # running or done commands (not both)
                if cmd.get("is_done") == is_done:
                    # done + failed (a failed command is always done btw)
                    if failed is not None and cmd.get("is_done") is True:
                        if cmd.get("failed") == failed:
                            cmds[_id] = jsonreadify(cmd)
                    else:
                        # don't care if failed or not
                        cmds[_id] = jsonreadify(cmd)
                else:
                    # If asked is_done=true, it means command _id has is_done=false
                    # if we get there. So the command is sill running, so we don't
                    # know if it failed or not, so no need to check failed there,
                    # it's been handled above.
                    # If asksed is_done=false, we don't need to check failed,
                    # same logic applies
                    continue
            else:
                # either running or done commands (both)
                if failed is not None and cmd.get("is_done") is True:
                    if cmd.get("failed") == failed:
                        cmds[_id] = jsonreadify(cmd)
                else:
                    # don't care if failed or not
                    cmds[_id] = jsonreadify(cmd)
        return cmds 
 
####################
# DEFAULT HUB CMDS #
####################
# these can be used in client code to define
# commands. partial should be used to pass the
# required arguments, eg.:
# {"schedule" ; partial(schedule,loop)}
[docs]
def stats(src_dump):
    pass 
[docs]
def template_out(field, confdict):
    """
    Return field as a templated-out filed,
    substituting some "%(...)s" part with confdict,
    Fields can follow dotfield notation.
    Fields like "$(...)" are replaced with a timestamp
    following specified format (see time.strftime)
    Example::
        confdict = {"a":"one"}
        field = "%(a)s_two_three_$(%Y%m)"
        => "one_two_three_201908" # assuming we're in August 2019
    """
    # first deal with timestamp
    pat = re.compile(r".*(\$\((.*?)\)).*")
    try:
        m = pat.match(field)
    except TypeError:
        # not string/byte-like just skip the process
        return field
    if m:
        tosub, fmt = m.groups()
        ts = datetime.datetime.now().strftime("%%%s" % fmt)
        field = field.replace(tosub, ts)
    flatdict = flatten_doc(confdict)
    # then use dict to sub keys
    field = field % flatdict
    return field 
[docs]
def publish_data_version(s3_bucket, s3_folder, version_info, update_latest=True, aws_key=None, aws_secret=None):
    """
    Update remote files:
        - versions.json: add version_info to the JSON list
                        or replace if arg version_info is a list
        - latest.json: update redirect so it points to latest version url
    "versions" is dict such as::
        {"build_version":"...",         # version name for this release/build
         "require_version":"...",       # version required for incremental update
         "target_version": "...",       # version reached once update is applied
         "type" : "incremental|full"    # release type
         "release_date" : "...",        # ISO 8601 timestamp, release date/time
         "url": "http...."}             # url pointing to release metadata
    """
    # import utils.aws within this function to avoid boto3 import error in
    # the same like CLI (boto3 is not required)
    import biothings.utils.aws as aws
    # register version
    versionskey = os.path.join(s3_folder, "%s.json" % VERSIONS)
    try:
        versions = json.loads(
            aws.get_s3_file_contents(versionskey, aws_key=aws_key, aws_secret=aws_secret, s3_bucket=s3_bucket).decode()
        )
    except (FileNotFoundError, json.JSONDecodeError):
        versions = {"format": "1.0", "versions": []}
    if isinstance(version_info, list):
        versions["versions"] = version_info
    else:
        # used to check duplicates
        tmp = {}
        # [tmp.setdefault(e["build_version"], e) for e in versions["versions"]]   # TODO: remove this line
        for e in versions["versions"]:
            tmp.setdefault(e["build_version"], e)
        tmp[version_info["build_version"]] = version_info
        # order by build_version
        versions["versions"] = sorted(tmp.values(), key=lambda e: e["build_version"])
    aws.send_s3_file(
        None,
        versionskey,
        content=json.dumps(versions, indent=True),
        aws_key=aws_key,
        aws_secret=aws_secret,
        s3_bucket=s3_bucket,
        content_type="application/json",
        overwrite=True,
    )
    # update latest
    if not isinstance(version_info, list) and update_latest:
        latestkey = os.path.join(s3_folder, "%s.json" % LATEST)
        newredir = os.path.join("/", s3_folder, "{}.json".format(version_info["build_version"]))
        # the consensus is that we will upload the data and have the
        # redirection, for record-keep purpose
        aws.send_s3_file(
            None,
            latestkey,
            content=json.dumps(version_info["build_version"], indent=True),
            content_type="application/json",
            aws_key=aws_key,
            aws_secret=aws_secret,
            s3_bucket=s3_bucket,
            overwrite=True,
            redirect=newredir,
        ) 
def _and(*funcs):
    """
    Calls passed functions, one by one. If one fails, then it stops.
    Function should return a asyncio Task. List of one Task only are also permitted.
    Partial can be used to pass arguments to functions.
    Ex: _and(f1,f2,partial(f3,arg1,kw=arg2))
    """
    all_res = []
    func1 = funcs[0]
    func2 = None
    fut1 = func1()
    # if type(fut1) == list:   # TODO: remove this line
    if isinstance(fut1, list):
        assert len(fut1) == 1, "Can't deal with list of more than 1 task: %s" % fut1
        fut1 = fut1.pop()
    if not isinstance(fut1, asyncio.Future):
        raise CommandError("First command didn't return a future, can't chain commands")
    all_res.append(fut1)
    # err = None
    def do(f, cb):
        f.result()  # consume exception if any
        if cb:
            all_res.extend(_and(cb, *funcs))
    if len(funcs) > 1:
        func2 = funcs[1]
        if len(funcs) > 2:
            funcs = funcs[2:]
        else:
            funcs = []
    fut1.add_done_callback(partial(do, cb=func2))
    return all_res
[docs]
class CompositeCommand(str):
    """
    Defines a composite hub commands, that is,
    a new command made of other commands. Useful to define
    shortcuts when typing commands in hub console.
    """
    def __init__(self, cmd):
        self.cmd = cmd
    def __str__(self):
        return "<CompositeCommand: '%s'>" % self.cmd 
############
# RELOADER #
############
[docs]
def exclude_from_reloader(path):
    # exlucde cached, git and hidden files
    return path.endswith("__pycache__") or ".git" in path or os.path.basename(path).startswith(".") 
[docs]
class BaseHubReloader(object):
    """
    Monitor sources' code and reload hub accordingly to update running code
    """
    def __init__(self, paths, reload_func, wait=5.0):
        """
        Monitor given paths for directory deletion/creation
        and for file deletion/creation. Poll for events every 'wait' seconds.
        """
        raise NotImplementedError("Implement me in a sub-class")
[docs]
    def poll(self):
        """Start monitoring changes on files and/directories"""
        raise NotImplementedError("Implement me in a sub-class") 
[docs]
    def watched_files(self):
        """Return a list of files/directories being watched"""
        raise NotImplementedError("Implement me in a sub-class") 
 
[docs]
class TornadoAutoReloadHubReloader(BaseHubReloader):
    """Reloader based on tornado.autoreload module"""
    def __init__(self, paths, reload_func, wait=5):
        self.mod = sys.modules["tornado.autoreload"]
        if isinstance(paths, str):
            paths = [paths]
        paths = set(paths)  # get rid of duplicated, just in case
        self.reload_func = reload_func
        self.mod.add_reload_hook(self.reload_func)
        # only listen to these events. Note: directory detection is done via a flag so
        # no need to use IS_DIR
        self.add_watch(paths)
        self.wait = wait
[docs]
    def monitor(self):
        logging.info("Monitoring source code in, %s:\n%s", repr(self.paths), pformat(self.watched_files()))
        self.mod.start(self.wait * 1000)  # millis 
[docs]
    def add_watch(self, paths):
        """This method recursively adds the input paths, and their children to tornado autoreload for watching them.
        If any file changes, the tornado will call our hook to reload the hub.
        Each path will be forced to become an absolute path.
        If a path is matched excluding patterns, it will be ignored.
        Only file is added for watching. Directory will be passed to another add_watch.
        """
        input_paths = paths.copy()
        self.paths = []
        for path in input_paths:
            if not os.path.isabs(path):
                path = os.path.abspath(path)
            if exclude_from_reloader(path):
                continue
            self.paths.append(path)
            self.mod.watch(path)
            for dirpath, dirnames, filenames in os.walk(path):
                if exclude_from_reloader(dirpath):
                    continue
                # Add file to watcher
                for fn in filenames:
                    f_path = os.path.join(dirpath, fn)
                    if exclude_from_reloader(f_path):
                        continue
                    self.mod.watch(f_path)
                # add dirnames' contents to watcher
                self.add_watch([os.path.join(dirpath, dirname) for dirname in dirnames]) 
[docs]
    def watched_files(self):
        return self.mod._watched_files 
 
        # return [d for d in self.mod._watched_files if os.path.isdir(d)]
[docs]
def get_hub_reloader(*args, **kwargs):
    if getattr(config, "USE_RELOADER", False):
        import tornado.autoreload  # noqa
        logging.info("Using Hub reloader based on tornado.autoreload")
        def ensure_run_task(func):
            def wrapper():
                result = func()
                if isinstance(result, asyncio.Future):
                    asyncio.get_event_loop().run_until_complete(result)
            return wrapper
        if kwargs.get("reload_func"):
            logging.info("Decorator reload_func to ensure it can be run")
            kwargs["reload_func"] = ensure_run_task(kwargs["reload_func"])
        return TornadoAutoReloadHubReloader(*args, **kwargs)
    else:
        logging.info("USE_RELOADER not set (or False), won't monitor for changes")
        return None