from functools import lru_cache
import json
import mimetypes
import os
from os.path import exists, getmtime, getsize, isdir, join, normcase, normpath
from pkg_resources import resource_exists, resource_filename, resource_isdir
from pyramid.asset import abspath_from_asset_spec, resolve_asset_spec
from pyramid.httpexceptions import HTTPMovedPermanently, HTTPNotFound
from pyramid.path import caller_package
from pyramid.response import FileResponse, _guess_type
from pyramid.traversal import traversal_path_info
[docs]
class static_view:
"""An instance of this class is a callable which can act as a
:app:`Pyramid` :term:`view callable`; this view will serve
static files from a directory on disk based on the ``root_dir``
you provide to its constructor.
The directory may contain subdirectories (recursively); the static
view implementation will descend into these directories as
necessary based on the components of the URL in order to resolve a
path into a response.
You may pass an absolute or relative filesystem path or a
:term:`asset specification` representing the directory
containing static files as the ``root_dir`` argument to this
class' constructor.
If the ``root_dir`` path is relative, and the ``package_name``
argument is ``None``, ``root_dir`` will be considered relative to
the directory in which the Python file which *calls* ``static``
resides. If the ``package_name`` name argument is provided, and a
relative ``root_dir`` is provided, the ``root_dir`` will be
considered relative to the Python :term:`package` specified by
``package_name`` (a dotted path to a Python package).
``cache_max_age`` influences the ``Expires`` and ``Max-Age``
response headers returned by the view (default is 3600 seconds or
one hour).
``use_subpath`` influences whether ``request.subpath`` will be used as
``PATH_INFO`` when calling the underlying WSGI application which actually
serves the static files. If it is ``True``, the static application will
consider ``request.subpath`` as ``PATH_INFO`` input. If it is ``False``,
the static application will consider request.environ[``PATH_INFO``] as
``PATH_INFO`` input. By default, this is ``False``.
``reload`` controls whether a cache of files is maintained or the asset
subsystem is queried per-request to determine what files are available.
By default, this is ``False`` and new files added while the process is
running are not recognized.
``content_encodings`` is a list of alternative file encodings supported
in the ``Accept-Encoding`` HTTP Header. Alternative files are found using
file extensions defined in :attr:`mimetypes.encodings_map`. An encoded
asset will be returned with the ``Content-Encoding`` header set to the
selected encoding. If the asset contains alternative encodings then the
``Accept-Encoding`` value will be added to the response's ``Vary`` header.
By default, the list is empty and no alternatives will be supported.
.. note::
If the ``root_dir`` is relative to a :term:`package`, or is a
:term:`asset specification` the :app:`Pyramid`
:class:`pyramid.config.Configurator` method can be used to override
assets within the named ``root_dir`` package-relative directory.
However, if the ``root_dir`` is absolute, configuration will not be able
to override the assets it contains.
.. versionchanged:: 2.0
Added ``reload`` and ``content_encodings`` options.
"""
def __init__(
self,
root_dir,
cache_max_age=3600,
package_name=None,
use_subpath=False,
index='index.html',
reload=False,
content_encodings=(),
):
# package_name is for bw compat; it is preferred to pass in a
# package-relative path as root_dir
# (e.g. ``anotherpackage:foo/static``).
self.cache_max_age = cache_max_age
if package_name is None:
package_name = caller_package().__name__
package_name, docroot = resolve_asset_spec(root_dir, package_name)
self.use_subpath = use_subpath
self.package_name = package_name
self.docroot = docroot
self.norm_docroot = normcase(normpath(docroot))
self.index = index
self.reload = reload
self.content_encodings = _compile_content_encodings(content_encodings)
self.filemap = {}
def __call__(self, context, request):
resource_name = self.get_resource_name(request)
files = self.get_possible_files(resource_name)
filepath, content_encoding = self.find_best_match(request, files)
if filepath is None:
raise HTTPNotFound(request.url)
content_type, _ = _guess_type(resource_name)
response = FileResponse(
filepath,
request,
self.cache_max_age,
content_type,
content_encoding,
)
if len(files) > 1:
_add_vary(response, 'Accept-Encoding')
return response
[docs]
def get_resource_name(self, request):
"""
Return the computed name of the requested resource.
The returned file is not guaranteed to exist.
"""
if self.use_subpath:
path_tuple = request.subpath
else:
path_tuple = traversal_path_info(request.path_info)
path = _secure_path(path_tuple)
if path is None:
raise HTTPNotFound('Out of bounds: %s' % request.url)
# normalize asset spec or fs path into resource_path
if self.package_name: # package resource
resource_path = '%s/%s' % (self.docroot.rstrip('/'), path)
if resource_isdir(self.package_name, resource_path):
if not request.path_url.endswith('/'):
raise self.add_slash_redirect(request)
resource_path = '%s/%s' % (
resource_path.rstrip('/'),
self.index,
)
else: # filesystem file
# os.path.normpath converts / to \ on windows
resource_path = normcase(normpath(join(self.norm_docroot, path)))
if isdir(resource_path):
if not request.path_url.endswith('/'):
raise self.add_slash_redirect(request)
resource_path = join(resource_path, self.index)
return resource_path
[docs]
def find_resource_path(self, name):
"""
Return the absolute path to the resource or ``None`` if it doesn't
exist.
"""
if self.package_name:
if resource_exists(self.package_name, name):
return resource_filename(self.package_name, name)
elif exists(name):
return name
[docs]
def get_possible_files(self, resource_name):
""" Return a sorted list of ``(size, encoding, path)`` entries."""
result = self.filemap.get(resource_name)
if result is not None:
return result
# XXX we could put a lock around this work but worst case scenario a
# couple requests scan the disk for files at the same time and then
# the cache is set going forward so do not bother
result = []
# add the identity
path = self.find_resource_path(resource_name)
if path:
result.append((path, None))
# add each file we find for the supported encodings
# we don't mind adding multiple files for the same encoding if there
# are copies with different extensions because we sort by size so the
# smallest is always found first and the rest ignored
for encoding, extensions in self.content_encodings.items():
for ext in extensions:
encoded_name = resource_name + ext
path = self.find_resource_path(encoded_name)
if path:
result.append((path, encoding))
# sort the files by size, smallest first
result.sort(key=lambda x: getsize(x[0]))
# only cache the results if reload is disabled
if not self.reload:
self.filemap[resource_name] = result
return result
[docs]
def find_best_match(self, request, files):
""" Return ``(path | None, encoding)``."""
# if the client did not specify encodings then assume only the
# identity is acceptable
if not request.accept_encoding:
identity_path = next(
(path for path, encoding in files if encoding is None),
None,
)
return identity_path, None
# find encodings the client will accept
acceptable_encodings = {
x[0]
for x in request.accept_encoding.acceptable_offers(
[encoding for path, encoding in files if encoding is not None]
)
}
acceptable_encodings.add(None)
# return the smallest file from the acceptable encodings
# we know that files is sorted by size, smallest first
for path, encoding in files:
if encoding in acceptable_encodings:
return path, encoding
return None, None
def add_slash_redirect(self, request):
url = request.path_url + '/'
qs = request.query_string
if qs:
url = url + '?' + qs
return HTTPMovedPermanently(url)
def _compile_content_encodings(encodings):
"""
Convert mimetypes.encodings_map into a dict of
``(encoding) -> [file extensions]``.
"""
result = {}
for ext, encoding in mimetypes.encodings_map.items():
if encoding in encodings:
result.setdefault(encoding, []).append(ext)
return result
def _add_vary(response, option):
vary = response.vary or []
if not any(x.lower() == option.lower() for x in vary):
vary.append(option)
response.vary = vary
_seps = {'/', os.sep}
def _contains_slash(item):
for sep in _seps:
if sep in item:
return True
_has_insecure_pathelement = {'..', '.', ''}.intersection
@lru_cache(1000)
def _secure_path(path_tuple):
if _has_insecure_pathelement(path_tuple):
# belt-and-suspenders security; this should never be true
# unless someone screws up the traversal_path code
# (request.subpath is computed via traversal_path too)
return None
if any([_contains_slash(item) for item in path_tuple]):
return None
encoded = '/'.join(path_tuple) # will be unicode
return encoded
[docs]
class QueryStringCacheBuster:
"""
An implementation of :class:`~pyramid.interfaces.ICacheBuster` which adds
a token for cache busting in the query string of an asset URL.
The optional ``param`` argument determines the name of the parameter added
to the query string and defaults to ``'x'``.
To use this class, subclass it and provide a ``tokenize`` method which
accepts ``request, pathspec, kw`` and returns a token.
.. versionadded:: 1.6
"""
def __init__(self, param='x'):
self.param = param
def __call__(self, request, subpath, kw):
token = self.tokenize(request, subpath, kw)
query = kw.setdefault('_query', {})
if isinstance(query, dict):
query[self.param] = token
else:
kw['_query'] = tuple(query) + ((self.param, token),)
return subpath, kw
[docs]
class QueryStringConstantCacheBuster(QueryStringCacheBuster):
"""
An implementation of :class:`~pyramid.interfaces.ICacheBuster` which adds
an arbitrary token for cache busting in the query string of an asset URL.
The ``token`` parameter is the token string to use for cache busting and
will be the same for every request.
The optional ``param`` argument determines the name of the parameter added
to the query string and defaults to ``'x'``.
.. versionadded:: 1.6
"""
def __init__(self, token, param='x'):
super().__init__(param=param)
self._token = token
def tokenize(self, request, subpath, kw):
return self._token
[docs]
class ManifestCacheBuster:
"""
An implementation of :class:`~pyramid.interfaces.ICacheBuster` which
uses a supplied manifest file to map an asset path to a cache-busted
version of the path.
The ``manifest_spec`` can be an absolute path or a :term:`asset
specification` pointing to a package-relative file.
The manifest file is expected to conform to the following simple JSON
format:
.. code-block:: json
{
"css/main.css": "css/main-678b7c80.css",
"images/background.png": "images/background-a8169106.png",
}
By default, it is a JSON-serialized dictionary where the keys are the
source asset paths used in calls to
:meth:`~pyramid.request.Request.static_url`. For example:
.. code-block:: pycon
>>> request.static_url('myapp:static/css/main.css')
"http://www.example.com/static/css/main-678b7c80.css"
The file format and location can be changed by subclassing and overriding
:meth:`.parse_manifest`.
If a path is not found in the manifest it will pass through unchanged.
If ``reload`` is ``True`` then the manifest file will be reloaded when
changed. It is not recommended to leave this enabled in production.
If the manifest file cannot be found on disk it will be treated as
an empty mapping unless ``reload`` is ``False``.
.. versionadded:: 1.6
"""
exists = staticmethod(exists) # testing
getmtime = staticmethod(getmtime) # testing
def __init__(self, manifest_spec, reload=False):
package_name = caller_package().__name__
self.manifest_path = abspath_from_asset_spec(
manifest_spec, package_name
)
self.reload = reload
self._mtime = None
if not reload:
self._manifest = self.get_manifest()
def get_manifest(self):
with open(self.manifest_path, 'rb') as fp:
return self.parse_manifest(fp.read())
[docs]
def parse_manifest(self, content):
"""
Parse the ``content`` read from the ``manifest_path`` into a
dictionary mapping.
Subclasses may override this method to use something other than
``json.loads`` to load any type of file format and return a conforming
dictionary.
"""
return json.loads(content.decode('utf-8'))
@property
def manifest(self):
""" The current manifest dictionary."""
if self.reload:
if not self.exists(self.manifest_path):
return {}
mtime = self.getmtime(self.manifest_path)
if self._mtime is None or mtime > self._mtime:
self._manifest = self.get_manifest()
self._mtime = mtime
return self._manifest
def __call__(self, request, subpath, kw):
subpath = self.manifest.get(subpath, subpath)
return (subpath, kw)