Source code for hupper.worker

import os
import signal
import sys
import threading
import time
import traceback

from .compat import get_pyc_path
from .compat import get_py_path
from .compat import interrupt_main
from .interfaces import IReloaderProxy
from . import ipc


class WatchSysModules(threading.Thread):
    """ Poll ``sys.modules`` for imported modules."""
    poll_interval = 1

    def __init__(self, callback):
        super(WatchSysModules, self).__init__()
        self.paths = set()
        self.callback = callback
        self.lock = threading.Lock()
        self.stopped = False

    def run(self):
        while not self.stopped:
            self.update_paths()
            time.sleep(self.poll_interval)

    def stop(self):
        self.stopped = True

    def update_paths(self):
        """ Check sys.modules for paths to add to our path set."""
        new_paths = []
        with self.lock:
            for path in expand_source_paths(iter_module_paths()):
                if path not in self.paths:
                    self.paths.add(path)
                    new_paths.append(path)
        if new_paths:
            self.callback(new_paths)

    def search_traceback(self, tb):
        """ Inspect a traceback for new paths to add to our path set."""
        new_paths = []
        with self.lock:
            for filename, line, funcname, txt in traceback.extract_tb(tb):
                path = os.path.abspath(filename)
                if path not in self.paths:
                    self.paths.add(path)
                    new_paths.append(path)
        if new_paths:
            self.callback(new_paths)


def expand_source_paths(paths):
    """ Convert pyc files into their source equivalents."""
    for src_path in paths:
        yield src_path

        # track pyc files for py files
        if src_path.endswith('.py'):
            pyc_path = get_pyc_path(src_path)
            if pyc_path:
                yield pyc_path

        # track py files for pyc files
        elif src_path.endswith('.pyc'):
            py_path = get_py_path(src_path)
            if py_path:
                yield py_path


def iter_module_paths(modules=None):
    """ Yield paths of all imported modules."""
    modules = modules or list(sys.modules.values())
    for module in modules:
        try:
            filename = module.__file__
        except (AttributeError, ImportError):  # pragma: nocover
            continue
        if filename is not None:
            abs_filename = os.path.abspath(filename)
            if os.path.isfile(abs_filename):
                yield abs_filename


class WatchForParentShutdown(threading.Thread):
    """ Watch the pipe to ensure the parent is still alive."""
    def __init__(self, pipe):
        super(WatchForParentShutdown, self).__init__()
        self.pipe = pipe

    def run(self):
        try:
            # wait until the pipe breaks
            while self.pipe.recv():  # pragma: nocover
                pass
        except EOFError:
            pass
        interrupt_main()


class Worker(object):
    """ A helper object for managing a worker process lifecycle. """
    def __init__(self, spec, args=None, kwargs=None):
        super(Worker, self).__init__()
        self.worker_spec = spec
        self.worker_args = args
        self.worker_kwargs = kwargs
        self.pipe, self._child_pipe = ipc.Pipe()
        self.terminated = False
        self.pid = None
        self.process = None
        self.exitcode = None
        self.stdin_termios = None

    def start(self):
        self.stdin_termios = ipc.snapshot_termios(sys.stdin.fileno())

        kw = dict(
            spec=self.worker_spec,
            spec_args=self.worker_args,
            spec_kwargs=self.worker_kwargs,
            pipe=self._child_pipe,
        )
        self.process = ipc.spawn(
            'hupper.worker.worker_main',
            kwargs=kw,
            pass_fds=[self._child_pipe.r_fd, self._child_pipe.w_fd],
        )
        self.pid = self.process.pid

        # activate the pipe after forking
        self.pipe.activate()

        # kill the child side of the pipe after forking as the child is now
        # responsible for it
        self._child_pipe.close()

    def is_alive(self):
        if self.process:
            return self.process.poll() is None
        return False

    def terminate(self):
        self.terminated = True
        self.process.terminate()

    def join(self):
        self.process.wait()
        self.exitcode = self.process.returncode

        if self.stdin_termios:
            ipc.restore_termios(sys.stdin.fileno(), self.stdin_termios)

        if self.pipe:
            try:
                self.pipe.close()
            except:  # pragma: nocover
                pass
            finally:
                self.pipe = None


# set when the current process is being monitored
_reloader_proxy = None


[docs] def get_reloader(): """ Get a reference to the current :class:`hupper.interfaces.IReloaderProxy`. Raises a ``RuntimeError`` if the current process is not actively being monitored by a parent process. """ if _reloader_proxy is None: raise RuntimeError('process is not controlled by hupper') return _reloader_proxy
[docs] def is_active(): """ Return ``True`` if the current process being monitored by a parent process. """ return _reloader_proxy is not None
class ReloaderProxy(IReloaderProxy): def __init__(self, pipe): self.pipe = pipe def watch_files(self, files): self.pipe.send(('watch', files)) def trigger_reload(self): self.pipe.send(('reload',)) def worker_main(spec, pipe, spec_args=None, spec_kwargs=None): if spec_args is None: spec_args = [] if spec_kwargs is None: spec_kwargs = {} # activate the pipe after forking pipe.activate() # SIGHUP is not supported on windows if hasattr(signal, 'SIGHUP'): signal.signal(signal.SIGHUP, signal.SIG_IGN) # disable pyc files for project code because it can cause timestamp # issues in which files are reloaded twice sys.dont_write_bytecode = True global _reloader_proxy _reloader_proxy = ReloaderProxy(pipe) parent_watcher = WatchForParentShutdown(pipe) parent_watcher.daemon = True parent_watcher.start() poller = WatchSysModules(_reloader_proxy.watch_files) poller.daemon = True poller.start() # import the worker path before polling sys.modules func = ipc.resolve_spec(spec) # start the worker try: func(*spec_args, **spec_kwargs) except: try: # attempt to send imported paths to the master prior to crashing poller.update_paths() poller.search_traceback(sys.exc_info()[2]) poller.stop() poller.join() except: # pragma: no cover pass raise