from functools import lru_cache
import json
import mimetypes
import os
from os.path import exists, getmtime, getsize, isdir, join, normcase, normpath
from pkg_resources import resource_exists, resource_filename, resource_isdir
from pyramid.asset import abspath_from_asset_spec, resolve_asset_spec
from pyramid.httpexceptions import HTTPMovedPermanently, HTTPNotFound
from pyramid.path import caller_package
from pyramid.response import FileResponse, _guess_type
from pyramid.traversal import traversal_path_info
[docs]
class static_view:
    """An instance of this class is a callable which can act as a
    :app:`Pyramid` :term:`view callable`; this view will serve
    static files from a directory on disk based on the ``root_dir``
    you provide to its constructor.
    The directory may contain subdirectories (recursively); the static
    view implementation will descend into these directories as
    necessary based on the components of the URL in order to resolve a
    path into a response.
    You may pass an absolute or relative filesystem path or a
    :term:`asset specification` representing the directory
    containing static files as the ``root_dir`` argument to this
    class' constructor.
    If the ``root_dir`` path is relative, and the ``package_name``
    argument is ``None``, ``root_dir`` will be considered relative to
    the directory in which the Python file which *calls* ``static``
    resides.  If the ``package_name`` name argument is provided, and a
    relative ``root_dir`` is provided, the ``root_dir`` will be
    considered relative to the Python :term:`package` specified by
    ``package_name`` (a dotted path to a Python package).
    ``cache_max_age`` influences the ``Expires`` and ``Max-Age``
    response headers returned by the view (default is 3600 seconds or
    one hour).
    ``use_subpath`` influences whether ``request.subpath`` will be used as
    ``PATH_INFO`` when calling the underlying WSGI application which actually
    serves the static files.  If it is ``True``, the static application will
    consider ``request.subpath`` as ``PATH_INFO`` input.  If it is ``False``,
    the static application will consider request.environ[``PATH_INFO``] as
    ``PATH_INFO`` input. By default, this is ``False``.
    ``reload`` controls whether a cache of files is maintained or the asset
    subsystem is queried per-request to determine what files are available.
    By default, this is ``False`` and new files added while the process is
    running are not recognized.
    ``content_encodings`` is a list of alternative file encodings supported
    in the ``Accept-Encoding`` HTTP Header. Alternative files are found using
    file extensions defined in :attr:`mimetypes.encodings_map`. An encoded
    asset will be returned with the ``Content-Encoding`` header set to the
    selected encoding. If the asset contains alternative encodings then the
    ``Accept-Encoding`` value will be added to the response's ``Vary`` header.
    By default, the list is empty and no alternatives will be supported.
    .. note::
       If the ``root_dir`` is relative to a :term:`package`, or is a
       :term:`asset specification` the :app:`Pyramid`
       :class:`pyramid.config.Configurator` method can be used to override
       assets within the named ``root_dir`` package-relative directory.
       However, if the ``root_dir`` is absolute, configuration will not be able
       to override the assets it contains.
    .. versionchanged:: 2.0
       Added ``reload`` and ``content_encodings`` options.
    """
    def __init__(
        self,
        root_dir,
        cache_max_age=3600,
        package_name=None,
        use_subpath=False,
        index='index.html',
        reload=False,
        content_encodings=(),
    ):
        # package_name is for bw compat; it is preferred to pass in a
        # package-relative path as root_dir
        # (e.g. ``anotherpackage:foo/static``).
        self.cache_max_age = cache_max_age
        if package_name is None:
            package_name = caller_package().__name__
        package_name, docroot = resolve_asset_spec(root_dir, package_name)
        self.use_subpath = use_subpath
        self.package_name = package_name
        self.docroot = docroot
        self.norm_docroot = normcase(normpath(docroot))
        self.index = index
        self.reload = reload
        self.content_encodings = _compile_content_encodings(content_encodings)
        self.filemap = {}
    def __call__(self, context, request):
        resource_name = self.get_resource_name(request)
        files = self.get_possible_files(resource_name)
        filepath, content_encoding = self.find_best_match(request, files)
        if filepath is None:
            raise HTTPNotFound(request.url)
        content_type, _ = _guess_type(resource_name)
        response = FileResponse(
            filepath,
            request,
            self.cache_max_age,
            content_type,
            content_encoding,
        )
        if len(files) > 1:
            _add_vary(response, 'Accept-Encoding')
        return response
[docs]
    def get_resource_name(self, request):
        """
        Return the computed name of the requested resource.
        The returned file is not guaranteed to exist.
        """
        if self.use_subpath:
            path_tuple = request.subpath
        else:
            path_tuple = traversal_path_info(request.path_info)
        path = _secure_path(path_tuple)
        if path is None:
            raise HTTPNotFound('Out of bounds: %s' % request.url)
        # normalize asset spec or fs path into resource_path
        if self.package_name:  # package resource
            resource_path = '%s/%s' % (self.docroot.rstrip('/'), path)
            if resource_isdir(self.package_name, resource_path):
                if not request.path_url.endswith('/'):
                    raise self.add_slash_redirect(request)
                resource_path = '%s/%s' % (
                    resource_path.rstrip('/'),
                    self.index,
                )
        else:  # filesystem file
            # os.path.normpath converts / to \ on windows
            resource_path = normcase(normpath(join(self.norm_docroot, path)))
            if isdir(resource_path):
                if not request.path_url.endswith('/'):
                    raise self.add_slash_redirect(request)
                resource_path = join(resource_path, self.index)
        return resource_path 
[docs]
    def find_resource_path(self, name):
        """
        Return the absolute path to the resource or ``None`` if it doesn't
        exist.
        """
        if self.package_name:
            if resource_exists(self.package_name, name):
                return resource_filename(self.package_name, name)
        elif exists(name):
            return name 
[docs]
    def get_possible_files(self, resource_name):
        """ Return a sorted list of ``(size, encoding, path)`` entries."""
        result = self.filemap.get(resource_name)
        if result is not None:
            return result
        # XXX we could put a lock around this work but worst case scenario a
        # couple requests scan the disk for files at the same time and then
        # the cache is set going forward so do not bother
        result = []
        # add the identity
        path = self.find_resource_path(resource_name)
        if path:
            result.append((path, None))
        # add each file we find for the supported encodings
        # we don't mind adding multiple files for the same encoding if there
        # are copies with different extensions because we sort by size so the
        # smallest is always found first and the rest ignored
        for encoding, extensions in self.content_encodings.items():
            for ext in extensions:
                encoded_name = resource_name + ext
                path = self.find_resource_path(encoded_name)
                if path:
                    result.append((path, encoding))
        # sort the files by size, smallest first
        result.sort(key=lambda x: getsize(x[0]))
        # only cache the results if reload is disabled
        if not self.reload:
            self.filemap[resource_name] = result
        return result 
[docs]
    def find_best_match(self, request, files):
        """ Return ``(path | None, encoding)``."""
        # if the client did not specify encodings then assume only the
        # identity is acceptable
        if not request.accept_encoding:
            identity_path = next(
                (path for path, encoding in files if encoding is None),
                None,
            )
            return identity_path, None
        # find encodings the client will accept
        acceptable_encodings = {
            x[0]
            for x in request.accept_encoding.acceptable_offers(
                [encoding for path, encoding in files if encoding is not None]
            )
        }
        acceptable_encodings.add(None)
        # return the smallest file from the acceptable encodings
        # we know that files is sorted by size, smallest first
        for path, encoding in files:
            if encoding in acceptable_encodings:
                return path, encoding
        return None, None 
    def add_slash_redirect(self, request):
        url = request.path_url + '/'
        qs = request.query_string
        if qs:
            url = url + '?' + qs
        return HTTPMovedPermanently(url) 
def _compile_content_encodings(encodings):
    """
    Convert mimetypes.encodings_map into a dict of
    ``(encoding) -> [file extensions]``.
    """
    result = {}
    for ext, encoding in mimetypes.encodings_map.items():
        if encoding in encodings:
            result.setdefault(encoding, []).append(ext)
    return result
def _add_vary(response, option):
    vary = response.vary or []
    if not any(x.lower() == option.lower() for x in vary):
        vary.append(option)
    response.vary = vary
_seps = {'/', os.sep}
def _contains_slash(item):
    for sep in _seps:
        if sep in item:
            return True
_has_insecure_pathelement = {'..', '.', ''}.intersection
@lru_cache(1000)
def _secure_path(path_tuple):
    if _has_insecure_pathelement(path_tuple):
        # belt-and-suspenders security; this should never be true
        # unless someone screws up the traversal_path code
        # (request.subpath is computed via traversal_path too)
        return None
    if any([_contains_slash(item) for item in path_tuple]):
        return None
    encoded = '/'.join(path_tuple)  # will be unicode
    return encoded
[docs]
class QueryStringCacheBuster:
    """
    An implementation of :class:`~pyramid.interfaces.ICacheBuster` which adds
    a token for cache busting in the query string of an asset URL.
    The optional ``param`` argument determines the name of the parameter added
    to the query string and defaults to ``'x'``.
    To use this class, subclass it and provide a ``tokenize`` method which
    accepts ``request, pathspec, kw`` and returns a token.
    .. versionadded:: 1.6
    """
    def __init__(self, param='x'):
        self.param = param
    def __call__(self, request, subpath, kw):
        token = self.tokenize(request, subpath, kw)
        query = kw.setdefault('_query', {})
        if isinstance(query, dict):
            query[self.param] = token
        else:
            kw['_query'] = tuple(query) + ((self.param, token),)
        return subpath, kw 
[docs]
class QueryStringConstantCacheBuster(QueryStringCacheBuster):
    """
    An implementation of :class:`~pyramid.interfaces.ICacheBuster` which adds
    an arbitrary token for cache busting in the query string of an asset URL.
    The ``token`` parameter is the token string to use for cache busting and
    will be the same for every request.
    The optional ``param`` argument determines the name of the parameter added
    to the query string and defaults to ``'x'``.
    .. versionadded:: 1.6
    """
    def __init__(self, token, param='x'):
        super().__init__(param=param)
        self._token = token
    def tokenize(self, request, subpath, kw):
        return self._token 
[docs]
class ManifestCacheBuster:
    """
    An implementation of :class:`~pyramid.interfaces.ICacheBuster` which
    uses a supplied manifest file to map an asset path to a cache-busted
    version of the path.
    The ``manifest_spec`` can be an absolute path or a :term:`asset
    specification` pointing to a package-relative file.
    The manifest file is expected to conform to the following simple JSON
    format:
    .. code-block:: json
       {
           "css/main.css": "css/main-678b7c80.css",
           "images/background.png": "images/background-a8169106.png",
       }
    By default, it is a JSON-serialized dictionary where the keys are the
    source asset paths used in calls to
    :meth:`~pyramid.request.Request.static_url`. For example:
    .. code-block:: pycon
       >>> request.static_url('myapp:static/css/main.css')
       "http://www.example.com/static/css/main-678b7c80.css"
    The file format and location can be changed by subclassing and overriding
    :meth:`.parse_manifest`.
    If a path is not found in the manifest it will pass through unchanged.
    If ``reload`` is ``True`` then the manifest file will be reloaded when
    changed. It is not recommended to leave this enabled in production.
    If the manifest file cannot be found on disk it will be treated as
    an empty mapping unless ``reload`` is ``False``.
    .. versionadded:: 1.6
    """
    exists = staticmethod(exists)  # testing
    getmtime = staticmethod(getmtime)  # testing
    def __init__(self, manifest_spec, reload=False):
        package_name = caller_package().__name__
        self.manifest_path = abspath_from_asset_spec(
            manifest_spec, package_name
        )
        self.reload = reload
        self._mtime = None
        if not reload:
            self._manifest = self.get_manifest()
    def get_manifest(self):
        with open(self.manifest_path, 'rb') as fp:
            return self.parse_manifest(fp.read())
[docs]
    def parse_manifest(self, content):
        """
        Parse the ``content`` read from the ``manifest_path`` into a
        dictionary mapping.
        Subclasses may override this method to use something other than
        ``json.loads`` to load any type of file format and return a conforming
        dictionary.
        """
        return json.loads(content.decode('utf-8')) 
    @property
    def manifest(self):
        """ The current manifest dictionary."""
        if self.reload:
            if not self.exists(self.manifest_path):
                return {}
            mtime = self.getmtime(self.manifest_path)
            if self._mtime is None or mtime > self._mtime:
                self._manifest = self.get_manifest()
                self._mtime = mtime
        return self._manifest
    def __call__(self, request, subpath, kw):
        subpath = self.manifest.get(subpath, subpath)
        return (subpath, kw)