Source code for hupper.watchman

# check ``hupper.utils.is_watchman_supported`` before using this module
import json
import os
import queue
import socket
import threading
import time

from .interfaces import IFileMonitor
from .utils import get_watchman_sockpath


[docs] class WatchmanFileMonitor(threading.Thread, IFileMonitor): """ An :class:`hupper.interfaces.IFileMonitor` that uses Facebook's ``watchman`` daemon to detect changes. ``callback`` is a callable that accepts a path to a changed file. """ def __init__( self, callback, logger, sockpath=None, binpath='watchman', timeout=1.0, **kw, ): super(WatchmanFileMonitor, self).__init__() self.callback = callback self.logger = logger self.watches = set() self.paths = set() self.lock = threading.Lock() self.enabled = True self.sockpath = sockpath self.binpath = binpath self.timeout = timeout self.responses = queue.Queue() def add_path(self, path): with self.lock: root = os.path.dirname(path) for watch in self.watches: if watch == root or root.startswith(watch + os.sep): break else: self._watch(root) if path not in self.paths: self.paths.add(path) def start(self): sockpath = self._resolve_sockpath() sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.settimeout(self.timeout) sock.connect(sockpath) self._sock = sock self._recvbufs = [] self._send(['version']) result = self._recv() self.logger.debug('Connected to watchman v' + result['version'] + '.') super(WatchmanFileMonitor, self).start() def join(self): try: return super(WatchmanFileMonitor, self).join() finally: self._sock.close() self._sock = None def run(self): while self.enabled: try: result = self._recv() except socket.timeout: continue if 'warning' in result: self.logger.error('watchman warning: ' + result['warning']) if 'error' in result: self.logger.error('watchman error: ' + result['error']) if 'subscription' in result: root = result['root'] files = result['files'] with self.lock: for f in files: if isinstance(f, dict): f = f['name'] path = os.path.join(root, f) if path in self.paths: self.callback(path) if not self._is_unilateral(result): self.responses.put(result) def _is_unilateral(self, result): if 'unilateral' in result and result['unilateral']: return True # fallback to checking for known unilateral responses for k in ['log', 'subscription']: if k in result: return True return False def stop(self): self.enabled = False def _resolve_sockpath(self): if self.sockpath: return self.sockpath return get_watchman_sockpath(self.binpath) def _watch(self, root): result = self._query(['watch-project', root]) if result['watch'] != root: root = result['watch'] self.logger.debug('Watchman is tracking root: ' + root) self._query( [ 'subscribe', root, '{}.{}.{}'.format(os.getpid(), id(self), root), { # +1 second because we don't want any buffered changes # if the daemon is already watching the folder 'since': int(time.time() + 1), 'expression': ['type', 'f'], 'fields': ['name'], }, ] ) self.watches.add(root) def _readline(self): # buffer may already have a line if len(self._recvbufs) == 1 and b'\n' in self._recvbufs[0]: line, b = self._recvbufs[0].split(b'\n', 1) self._recvbufs = [b] return line while True: b = self._sock.recv(4096) if not b: raise RuntimeError('lost connection to watchman') if b'\n' in b: result = b''.join(self._recvbufs) line, b = b.split(b'\n', 1) self._recvbufs = [b] return result + line self._recvbufs.append(b) def _recv(self): line = self._readline().decode('utf8') try: return json.loads(line) except Exception: # pragma: no cover self.logger.info('Ignoring corrupted payload from watchman.') return {} def _send(self, msg): cmd = json.dumps(msg).encode('ascii') self._sock.sendall(cmd + b'\n') def _query(self, msg, timeout=None): self._send(msg) return self.responses.get(timeout=timeout)